如何在异步编程中正确地使用 Contextvars
May 16, 2020
2 minute read

最近我对一个用 async/await 语法去做异步编程的项目做贡献。 经过一段时间不停地码代码,测试及修复,我总结了如何在异步编程中正确地使用 Contextvars 的经验。

What Is contextvars

contextvars 来源于 PEP-567 ,它提供了 APIs 允许开发者去保存,访问及管理 local context。 它与 thread-local storage (TLS) 类似。但它还支持维护异步代码的 context.

import asyncio
from contextvars import ContextVar, Context

var_what = ContextVar("what")

async def hello():
    try:
        return f"hello {var_what.get()}"
    except LookupError:
        return "hello?"

# 以下代码通过 `python -m asyncio` 去执行
assert await hello() == "hello?"
task_1 = asyncio.create_task(hello())
coroutine_1 = hello()
var_what.set("world")
task_2 = asyncio.create_task(hello())
coroutine_2 = hello()
assert await task_1 == "hello?"
assert await task_2 == "hello world"
assert await coroutine_1 == "hello world"
assert await coroutine_2 == "hello world"

这个例子使用预先创造好的 ContextVar 对象去保存及访问当前 context 里的 what 变量。 这例子还暴露了一个问题,就是 coroutine 对象无法处理异步代码的 context 。 只有 asyncio.Task 在创建的时候会保存当前的 context,及自动的应用该 context 。

查看 contextvars 文档以了解更多信息及用法。

最佳示例

如果你要创建一个 context,首先要做的是创建一些资源,保存在 context 中,然后在该 context 下执行异步代码,最后清理掉 context 中不需要的资源 。 最好的方式是使用 contextlib.asynccontextmanager 函数去做装饰器。

  • 能够用 async-with 语法显得更符合 Python 哲学
  • 资源初始化及清理代码处于同个函数内,更具有可读性

在实际情况下,我们在 web 应用中需要保存请求相关的数据,比如安全密钥及请求参数;需要共享全局资源,比如数据库连接池。所以我码了个如何在异步编程中使用 contextvars 的例子。

from contextlib import asynccontextmanager
from contextvars import copy_context, ContextVar

var_redis_client = ContextVar("redis_client")

@asynccontextmanager
async def create_app_context(settings):
    # 资源初始化过程
    redis_client = object()  # FIXME: 创建 Redis 客户端实例
    var_redis_client.set(redis_client)
    try:
        # 生产当前的 context
        yield copy_context()
    finally:
        # 资源清理过程
        # FIXME: 清理 Redis 客户端实例
        pass

# 以下代码通过 `python -m asyncio` 去执行
settings = {"redis_uri": "redis://..."}
async with create_app_context(settings) as app_ctx:
    assert var_redis_client in copy_context()
    # 用两种方式去获取被保存在当前 context 的 Redis 客户端
    assert var_redis_client.get() is app_ctx[var_redis_client]

测试代码

使用 pytest fixtures 去创建 app context。这样能让测试样例很容易地访问使用

@pytest.mark.asyncio 装饰器来自 pytest-asyncio 第三方库.

import pytest

@pytest.fixture
def settings():
    return {"redis_uri": "redis://..."}

@pytest.fixture
async def app_ctx(settings):
    async with create_app_context(settings) as ctx:
        # 使用 yield fixture 去初始化,产生 context 方便测试样例访问
        yield ctx

@pytest.fixture
def redis_client(app_ctx):
    return app_ctx[var_redis_client]


@pytest.mark.asyncio
async def test_without_context(app_ctx, redis_client):
    assert var_redis_client not in copy_context()
    # 虽然当前 context 没有 Redis 客户端实例
    # 但仍然可以通过 app_ctx 去访问 Redis 客户端实例
    assert redis_client is app_ctx[var_redis_client]

def apply_context(ctx):
    """
    更新当前的 context
    """
    for var in ctx:
        var.set(ctx[var])

@pytest.mark.asyncio
async def test_within_context(app_ctx, redis_client):
    # 必须更新当前的 context,这样才能在测试函数体内访问到 Redis 客户端
    apply_context(app_ctx)

    assert var_redis_client in copy_context()
    assert redis_client is app_ctx[var_redis_client]

因为来自异步 yield_fixture 的 context 无法传递给其他 fixtures 或测试函数,所以我们需要显式地传递 app context。我还写了个简单的函数 apply_context 去把传递进来的 context 应用到当前的 context 里。如果 fixtures 或者测试函数需要 context 的话,就需要先调用这个函数。

如果测试函数被标记成 @pytest.mark.asyncio 的话,最好能隐式地把它的异步 yield_fixtures 里的 context,传递给其他 fixtures 及测试函数。所以我向 pytest-asyncio 提了个 PR ,如果能被合并就好了。

References




comments powered by Disqus