用 docker-compose 部署 MongoDB Replica Set 开发环境
Aug 4, 2019
4 minute read

前言

话不多说,直接进正题

如何搭建和配置

把以下内容写入 docker-compose.yml 文件中。然后运行 docker-compose up -d --scale mongo=3,就可以启动三个 MongoDB 服务。接下来我们要把这三个 MongoDB 节点配置成一组 Replica Set

version: '3.5'
services:
  mongo:
    image: mongo
    ports:
      - "27017-27118:27017"
    command: mongod --replSet main

把以下内容写入 utils.py 文件中

import subprocess as sp


def get_node_ports():
    ports = []
    idx = 1
    while True:
        try:
            output = sp.check_output(
                f"docker-compose port --index {idx} mongo 27017",
                shell=True,
                encoding="utf-8",
                stderr=sp.DEVNULL,
            )
        except sp.CalledProcessError:
            break

        port = output.split(":")[1]
        ports.append(int(port))
        idx += 1

    if not ports:
        raise RuntimeError("no node is running.")

    return ports

把以下内容写入 init.py 文件中,再执行 python init.py 命令配置这组 Replica Set

import json
import subprocess as sp
import tempfile

from utils import get_node_ports


def generate_init_js(node_ports):
    members = [
        {"_id": idx, "host": f"host.docker.internal:{port}"}
        for idx, port in enumerate(node_ports)
    ]
    initiate_parameters = {"_id": "main", "members": members}

    init_js = f"rs.initiate({json.dumps(initiate_parameters, indent=2)})"
    return init_js


def main():
    ports = get_node_ports()
    init_js = generate_init_js(ports)
    print("\x1b[32m$\x1b[0m cat init.js")
    print(init_js)

    container_name = sp.check_output(
        "docker-compose ps | grep mongo_1 | awk '{ print $1 }'",
        shell=True,
        encoding="utf-8",
    ).strip()
    with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as f:
        f.write(init_js)
        f.flush()

        path = f.name
        sp.check_output(f"docker cp {path} {container_name}:/init.js", shell=True)
        print("\x1b[32m$\x1b[0m copy init.js")

    output = sp.check_output(
        f'docker exec {container_name} sh -c "mongo --port 27017 < /init.js"',
        shell=True,
        encoding="utf-8",
    )
    print("\x1b[32m$\x1b[0m initiate")
    print(output)


if __name__ == "__main__":
    main()

执行的结果如下

$ cat init.js rs.initiate({ "_id": "main", "members": [ { "_id": 0, "host": "host.docker.internal:27025" }, { "_id": 1, "host": "host.docker.internal:27023" }, { "_id": 2, "host": "host.docker.internal:27024" } ] }) $ copy init.js $ initiate MongoDB shell version v4.0.11 connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb Implicit session: session { "id" : UUID("af1e1d7a-69aa-406d-abb3-73b422f3f093") } MongoDB server version: 4.0.11 { "ok" : 1, "operationTime" : Timestamp(1565015581, 1), "$clusterTime" : { "clusterTime" : Timestamp(1565015581, 1), "signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="), "keyId" : NumberLong(0) } } } bye

测试

搭建完成了,接下来用以下代码测试一下

import asyncio
import logging
import socket

from unittest import mock

import pytest

from motor.motor_asyncio import AsyncIOMotorClient

from utils import get_node_ports


@pytest.fixture(scope="session")
def node_addresses():
    node_ports = get_node_ports()
    return [("localhost", port) for port in node_ports]


@pytest.fixture(autouse=True)
def hook_getaddrinfo():
    original_getaddrinfo = socket.getaddrinfo

    def container_2_host_address(host, port, family=0, type=0, proto=0, flags=0):
        """
        由于 MongoDB 会下发 replica 在容器中的地址,
        必须替换成在主机上的正确地址
        """
        if host == "host.docker.internal":
            logging.debug("container 2 host")
            host = "localhost"
        else:
            logging.debug("no change %r", (host, port))

        return original_getaddrinfo(host, port, family, type, proto, flags)

    with mock.patch("socket.getaddrinfo", new=container_2_host_address):
        yield


@pytest.fixture
def uri(node_addresses):
    nodes = ",".join(f"{host}:{port}" for host, port in node_addresses)
    return f"mongodb://{nodes}/?replicaSet=main"


@pytest.fixture
async def create_db(uri):
    async def creator():
        client = AsyncIOMotorClient(uri)
        db = client["default"]
        return db

    db = await creator()

    # 删除掉所有 collections
    col_names = await db.list_collection_names()
    for col_name in col_names:
        await db.drop_collection(col_name)

    return creator


async def get_latest_oplog_timestamp(client):
    """
    获取最新的 oplog 时间戳
    """
    cursor = client.local.oplog.rs.find().sort("$natural", -1).limit(-1)
    await cursor.fetch_next
    return cursor.next_object()["ts"]


@pytest.mark.asyncio
async def test_change_stream(create_db):
    db = await create_db()
    col_name = "boo"
    await db.create_collection(col_name)
    logging.debug("create collection")

    current_timestamp = await get_latest_oplog_timestamp(db.client)
    logging.debug("latest oplog timestamp %r", current_timestamp)

    async def wait_for_change():
        async with db[col_name].watch(
            pipeline=[{"$match": {"operationType": "insert"}}],
            start_at_operation_time=current_timestamp,
        ) as change_stream:
            # 使用 startAtOperationTime 参数,避免漏掉插入事件
            # https://docs.mongodb.com/manual/changeStreams/#start-time
            # 可要创建一个 bson.Timestamp 对象
            logging.debug("wait for change")
            async for change in change_stream:
                return change

    fut = asyncio.ensure_future(wait_for_change())

    data = {"username": "black"}
    await db[col_name].insert_one(data)
    logging.debug("insert new document %r", data)

    change = await fut
    logging.debug("received change %r", change)

    assert data == change["fullDocument"]

从以上的代码,你可能会发现代码利用 unittest.mock.patch 修改了 socket.getaddrinfo 函数的执行,目的是为了替换 MongoDB Replica Set 下发的节点地址。这个地址为容器内的主机地址 host.docker.internal,需要替换成本地主机地址 localhost,以避免无法解析导致测试代码运行失败。如果把程序打包成 docker 镜像,在 docker 容器中开发测试的话,就去掉这一部分的代码了

pytest test.py --log-cli-level debug

把代码写入 test.py,再执行以上命令

============================= test session starts ============================== platform darwin -- Python 3.7.4, pytest-5.0.1, py-1.8.0, pluggy-0.12.0 rootdir: /Users/linw1995/Documents/me/tmp plugins: asyncio-0.10.0 collected 1 item test.py::test_change_stream -------------------------------- live log setup -------------------------------- DEBUG  asyncio:selector_events.py:53 Using selector: KqueueSelector DEBUG  asyncio:selector_events.py:53 Using selector: KqueueSelector DEBUG  root:test.py:33 no change ('localhost', 27025) DEBUG  root:test.py:33 no change ('localhost', 27024) DEBUG  root:test.py:33 no change ('localhost', 27023) DEBUG  root:test.py:30 container 2 host DEBUG  root:test.py:30 container 2 host DEBUG  root:test.py:30 container 2 host DEBUG  root:test.py:30 container 2 host -------------------------------- live log call --------------------------------- DEBUG  root:test.py:33 no change ('localhost', 27025) DEBUG  root:test.py:33 no change ('localhost', 27024) DEBUG  root:test.py:33 no change ('localhost', 27023) DEBUG  root:test.py:30 container 2 host DEBUG  root:test.py:30 container 2 host DEBUG  root:test.py:30 container 2 host DEBUG  root:test.py:30 container 2 host DEBUG  root:test.py:78 create collection DEBUG  root:test.py:81 latest oplog timestamp Timestamp(1565015602, 1) DEBUG  root:test.py:91 wait for change DEBUG  root:test.py:30 container 2 host DEBUG  root:test.py:99 insert new document {'username': 'black', '_id': ObjectId('5d483e325c87feb3e9b4bf9d')} DEBUG  root:test.py:102 received change {'_id': {'_data': '825D483E32000000022B022C0100296E5A10044B41D3216978416C8D3372D0F6FCEA4046645F696400645D483E325C87FEB3E9B4BF9D0004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1565015602, 2), 'fullDocument': {'_id': ObjectId('5d483e325c87feb3e9b4bf9d'), 'username': 'black'}, 'ns': {'db': 'default', 'coll': 'boo'}, 'documentKey': {'_id': ObjectId('5d483e325c87feb3e9b4bf9d')}} PASSED [100%] =========================== 1 passed in 2.64 seconds ===========================

成功了~这样用来做开发测试真是十分的方便呢

参考

  1. Replication | MongoDB
  2. Change Streams | MongoDB
  3. I WANT TO CONNECT FROM A CONTAINER TO A SERVICE ON THE HOST | Docker for Mac
  4. I WANT TO CONNECT FROM A CONTAINER TO A SERVICE ON THE HOST | Docker for Windows



comments powered by Disqus