Run Twisted in another thread
Dec 27, 2018
2 minute read

Develop a project related to a service project base on Twisted, and consider how to test it. This is a solution. Run Twisted’s reactor in another thread and delegating the Twisted functions calling into it.

Use this fixture run a reactor in another thread. Due to the reactor is not restartable, so the fixture’s scope is “session”.

import pytest

from twisted.internet import reactor as twisted_reactor


@pytest.fixture(scope="session", autouse=True)
def reactor():
    t = threading.Thread(
        target=twisted_reactor.run,
        kwargs={"installSignalHandlers": False},
        daemon=True,
    )
    t.start()

    yield twisted_reactor

    twisted_reactor.callFromThread(twisted_reactor.stop)
    t.join()

Write a simple Echo Protocol for example.

from twisted.internet.protocol import Protocol, Factory

class Echo(Protocol):
    def dataReceived(self, data):
        self.transport.write(data)


class EchoFactory(Factory):
    def buildProtocol(self, addr):
        return Echo()

Use Pytest fixture to run an echo service.

import socket
import threading

import pytest

from twisted.application import internet


@pytest.fixture
def get_unused_port():
    def unused_port_getter():
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(("", 0))
        addr = s.getsockname()
        s.close()
        return addr[1]

    return unused_port_getter


@pytest.fixture
def port(get_unused_port):
    return get_unused_port()


@pytest.fixture
def echo_service(reactor, port):
    echo_service = internet.TCPServer(port, EchoFactory())

    reactor.callFromThread(echo_service.startService)

    started = threading.Event()

    def wait_started():
        if echo_service.running:
            started.set()
        else:
            reactor.callLater(0.1, wait_started)

    reactor.callFromThread(wait_started)
    started.wait()

    yield echo_service

    stoped = threading.Event()

    def wait_stoped():
        echo_service.stopService().addBoth(lambda _: stoped.set())

    reactor.callFromThread(wait_stoped)
    stoped.wait()

Now, write and run a simple test.

def test_echo_service(echo_service, port):
    conn = socket.create_connection(("", port))
    msg = b"abcdefg"
    conn.send(msg)
    assert msg == conn.recv(len(msg))

Tip:Write base RPC methods unit test. Then using stubs to do the high-level encapsulation unit test.

Complete Code

GIST: Run Twisted in another thread | linw1995

References




comments powered by Disqus