函数异步化🔗
Ariadne 在 util.async_exec 模块里提供了 io_bound 与 cpu_bound 两个函数,
可以用于方便的将普通函数异步化以防止阻塞.
用法🔗
首先导入:
from graia.ariadne.util.async_exec import io_bound, cpu_bound
然后对函数包装:
@cpu_bound
def foo(n: int) -> int:
ans = 0
for i in range(1, n + 1):
ans += i ** i
return ans
这时, foo 的签名就从原来的 (n: int) -> int 变为了 (n: int) -> Awaitable[int].
在其他地方, 使用 await foo(50000) 来异步地调用这个函数.
警告
这两个装饰器都不能用于动态创建的函数 (如闭包, lambda 表达式).
io_bound 包装过的函数是在单独 线程 中运行的,
cpu_bound 包装过的函数是在单独 进程 中运行的,
限制
因为 cpu_bound 的参数是通过 pickle 传入的, 所以不要想着传递奇奇怪怪的对象作为参数 (文件, 窗口, 进程, 协程等).
也因为这个, cpu_bound 包装的函数 所在模块 在导入时不能执行 Saya Channel 之类上下文的获取操作.
提示
如果你需要在 Saya 模块中包装 cpu_bound 函数, 也是可以的, 但是要将上下文的值设为 Dummy 对象:
# import ...
from graia.ariadne.util.async_exec import IS_MAIN_PROCESS
from graia.ariadne.util import Dummy
if IS_MAIN_PROCESS():
saya = Saya.current()
channel = Channel.current()
...
else:
saya = Dummy()
channel = Dummy()
...
...
这样就可以保证只在主进程中进行上下文的获取了.
充分利用 ParallelExecutor🔗
在你的代码中可以通过 ParallelExecutor.get() 得到一个 ParallelExecutor 实例, 可以在其上运行 to_thread 与 to_process 异步方法.
这些方法可以运行被包装过的和没被包装过的函数.
提示
注意其签名为 (func: Callable[P, R], *args, **kwargs) -> Awaitable[R]. 也就是说, 不用传入元组作为打包的参数了.
进一步控制🔗
要进一步控制, 可以从 graia.ariadne.util.async_exec 导入 ParallelExecutor.
通过提前执行 ParallelExecutor(loop=app.loop, max_thread=max_t, max_proc=max_p) 可以控制并行执行器的最大线程, 进程数.
loop 可以在之后通过 bind_loop 函数传入.
原理🔗
因为被 io_bound cpu_bound 包装的函数在另一个进程中无法访问到其原有函数,
我们在 ParallelExecutor 的类变量 func_mapping 中存储了函数的 __qualname__ 至原函数本身的映射.
这样执行时 io_bound cpu_bound 只传递 __qualname__ 与 __module__ 给实际的执行函数, 它会通过导入 __module__ 完成另一个进程中的函数注册.
之后通过 ParallelExecutor.func_mapping[func_qualname](*args, **kwargs) 完成调用.
社区文档相关章节: 链接