asyncio 教程🔗
本章摘自 YiriMirai 的文档。
YiriMirai 是一个轻量, 低耦合的 Python QQ Bot 框架.
Python 从 3.4 开始引入了 asyncio 库,提供异步协程支持,从 Python 3.4 到 3.7,对异步的支持一直在不断地进步。可以说,异步就是 Python 未来的趋势之一。
在了解异步之前,你可能已经接触过或听说过很多类似的概念,比如“并发”“并行”“多线程”等等,这么多名词,再加上一个“异步”,让人有些混乱。
不过,虽然看起来很复杂,但是,通过这篇教程,我相信你能够对“异步”和“协程”有一个清晰的把握。
以下是这篇教程涵盖的内容:
- 异步:一种编程范式,独立于编程语言,可以具有多种语言的不同实现。
- async/await:两个新的 Python 关键字,用于规定异步逻辑。
- asyncio:一个 Python 内置的包,提供协程的基础功能和管理协程的方法。
如果你心急了,请直接跳到 使用协程
什么是异步?🔗
在讲异步之前,需要先知道一个与之相对的概念:同步。
其实同步对我们而言并不陌生。在接触到异步之前,我们认知的一切都是同步的。我们可以想象代码一句一句地执行,遇到函数就进入,执行完函数再执行函数后面的东西,所有的执行顺序都严格地和代码书写的顺序一致——这就是同步。
比如这样的代码:
import time
print(1)
time.sleep(10)
print(2)
我们可以清晰地预知它的行为:先打印 1,然后等待 10 秒,最后打印 2。
同步最大的优点是 思路清晰 。 你永远可以知道你的程序是按照怎样的顺序执行的,不用担心任何意料之外的情况 。
但同时,同步也有不可避免的缺点:在某些情境下,它会带来性能的浪费。
以一个生活中的场景为例:你在网上下载一个文件,但是文件很大,需要很长时间才能下完。这时候,你肯定不会盯着进度条一点点走,而是去做点别的事情,读一会书,看一会电影,之类的。
但如果有一个完全“同步”的人,他为自己设定的程序是下载这个文件,然后运行它,这样的话,他为了不让自己的运行逻辑乱掉,就会一直守在电脑前,直到文件下载完成才做下一步的工作。正常人肯定不能忍受这样无聊的等待,也不会希望自己的程序在这种无聊的事上浪费时间。
为了能充分利用这些可能被浪费掉的时间,人们想出过很多方法,比如 多线程 ,比如 回调 。
多线程是最容易想到的一种方式了。我们可以创建多个线程,每个线程内部是同步的,再让这些线程同时运行。这样,在一个线程等待的时候,其余的线程可以继续运行,不会因此停止。
这看上去十分美好,然而现实并不像我们想象得那么简单。虽然线程各自内部的代码是同步的,但不同线程之间的代码执行顺序已经无从确定。换言之,我们引入了“异步”。 代码的执行顺序与书写顺序不一致的现象就是异步 。
熟悉多线程的人应该了解过“锁”的各种花样。两个线程共享资源时,如果同时对同一个变量进行修改,就有可能导致出乎预料的情况。所以人们发明了“锁”,来保证同一时刻只有唯一的线程对某一个变量进行修改。可以说,“锁”是为了制服异步而发明的。
graph TD;
subgraph 执行顺序
H{{b = 1}} --> F[a = 1]
F --> G{{a += b}}
G --> I["print(a)"]
end
subgraph Thread1
A[a = 1] --> B["print(a)"]
B --> E([Output: 2])
end
subgraph Thread2
C{{b = 1}} --> D{{a += b}}
D -.->|Breaking!| E
end
如此看来,异步似乎是一件不太好的事情。它让程序的运行平添了一种不可控的随意性,我们不再能看到代码就直接推演出结果,因为即使是相邻书写的僵局代码,之间也可能会有别的代码执行。
但是 异步相较于同步而言,带来的性能提升实在是太诱人了 。一个网站的服务器,如果是同步的,那么它同时只能让一个用户访问——这简直是不可理喻的事情。异步就像几万年前蹲踞在草原清冷的月光下,用锐利的眼神眺望人类营地篝火的一只秀美的狼,而猎手们则思考着如何给它套上项圈,变成自己忠实的猎犬。
所以,多线程、回调,包括协程,这一系列设计,与其说是为了充分利用计算资源,不如说是为了 在人类可理解的范畴内,更好地制服异步 。
锁的“困境”🔗
多线程和锁的结合,至今仍然是异步编程的最佳选择之一。因此,怎么加锁、在何处加锁,就成为了一门重要的学问。
锁是个好东西,但是不能有太多。一方面,反复地获取锁释放锁会占用运行时间;另一方面,当一个线程长时间持有某个锁时,其他的线程如果想要访问这个资源,也必须在原地等待, 极端情况下,同一时间只有一个线程在执行!
人们在优化锁的使用上花了很大工夫,从中衍生出的各种理论此处不再赘述。随着锁的使用方式逐渐变得复杂,人们发现,如果想要完美地控制锁的粒度,就不得不对花大量的代码去精细地控制每一个锁,这让编写代码的难度大大提高了。
有的时候,控制锁带来的复杂度已经超出了人能忍受的范围。于是我们经常见到许多简单粗暴的操作——比如在 CPython 中臭名昭著的 GIL。它将整个 Python 解释器加锁,来彻底解决 Python 代码内的线程冲突问题;后果就是, 所有的 Python 线程都必须等待这个锁,硬生生地让多线程程序几乎退化成了单线程 。
当然,GIL 的存在也没有完全把多线程的优点抹消掉。比如一个线程在 sleep
的时候,或者在等待网络请求返回的时候,还是会乖乖地释放掉 GIL 锁,让其他线程运行的。不过这时候,释放 GIL 的线程实际上只是在等着,什么都不干,最后还是只有一个线程在运行。
既然还是只有一个线程在运行,那么为什么不干脆用单线程实现呢?
答案是:当然可以。对于这个问题,人们给出了许多答案,其中最著名的是两种:回调和协程 。
从回调到协程(上)🔗
介绍了这么多,我们终于第一次提到了“协程”这个词。不过先不要着急,要想理解协程的概念,我们还需要一些基础的东西。
回到上面刚刚提出的问题,不过这里要换个说法:怎样在单线程中实现异步 ?
其实在刚才的讨论中我们已经知道了问题的答案,那就是,像带着 GIL 的多线程一样,在某个地方需要等待时,就立马切换到别的任务,等待完成之后,再继续刚才的任务。
graph LR;
A["任务1(发起请求)"] -.-> C{{等待}}
A --->|切换到| D[任务2]
C -..-> E{{等待完成}}
E -.-> F["任务3(需要请求结果)"]
D --->|继续执行| F
单线程异步的逻辑看起来就是这么简单,也十分容易理解。但是,易于理解不代表易于实现。当人们真的开始动手写一段单线程异步的代码时,就发现有许多显而易见却难以说明的问题。
“需要等待”是什么?“切换”是怎么完成的?“等待完成”指的是什么?“继续”又是怎么实现的?
这些问题说复杂也复杂,说简单也简单。为了不浪费大家的思考时间,我在这里直接公布答案:
第一个问题的答案其实很直白。哪些任务需要等待,在程序运行之前就能看出来。简单如 sleep
,复杂如网络请求,这些消耗时间,但不怎么消耗 CPU 的任务,就是需要等待的任务。
第二个问题的答案有两种,对这个问题的回答的不同也正是回调与协程最初的分歧之处。协程式的答案稍后会说,暂且不表,先看看回调式的答案,非常简单粗暴:不要切换 。
举个例子:
import requests
# job 1
response = requests.get('http://example.org/very_large_file.txt')
print(response.text.count('e'))
# job 2
for i in range(10):
print(i)
我们有两个任务,一个是抓取网络上的文件,一个是打印数字。网络上的文件非常大,需要很长时间来加载,这时候,我们希望可以在等待文件下载的时候,去执行打印数字的任务。但是问题来了:前两行代码写的严丝合缝,该怎么把打印数字的任务插进去?
实际上,我们知道,一切问题的根源都出在 requests.get
的调用上。这是一个同步的调用,不等到下载完成就不会返回。因此,我们需要的是一个异步的方法,能够在下载完成之前就返回。
def get_async(): ...
# job 1
get_async('http://example.org/very_large_file.txt')
# print(response.text.count('e'))
# job 2
for i in range(10):
print(i)
虽然实际上 Python 并没有哪一个库提供这样的一个 get_async
,但在这里我们不妨做一次迷人的假设。我们希望能有一个异步的 get_async
,调用后会发起一次网络请求,然后立刻返回,这样,程序的流程就顺理成章地走到了打印数字的地方 。
只是有一个问题还没有解决——从哪里读取下载的文件呢?
好像有点尴尬。get_async
创建了下载任务之后,就把它丢到一边不管了,下载完的东西也没有办法拿到。我们当然不允许这种买椟还珠的行为,所以还需要一点点的处理,让我们能够以某种方式,拿到下载的文件。
这就涉及到刚才提出的第三和第四个问题的答案了。“等待完成”自然是指文件下载完成,而“继续”的方式才是重头戏—— 回调 。
回调(callback)是将程序的一部分以函数的形式传递出去,供外部调用的一种模式。这么说有点抽象,我们结合刚才的例子来说明。
在刚才,我们遇到了没办法拿到下载的文件的问题。其实换个角度来看,我们需要的,是一个 能够在下载完成之后,执行一段利用下载的文件中的内容的代码的方法 。
利用回调,我们可以轻松地完成这一点。
def get_async(): ...
# job 1
get_async('http://example.org/very_large_file.txt', callback=job1_continuation)
def job1_continuation(response):
print(response.text.count('e'))
# job 2
for i in range(10):
print(i)
把第一个任务的剩余部分写到回调函数里,然后传给 get_async
。当文件下载完成后,回调函数就会以下载的文件的内容作为参数,调用回调函数。这就成功地将一段同步的代码改造成了异步。可喜可贺,可喜可贺。
不过,好像忘了点什么?
我们并没有说这个要怎么实现,实际上,像 Go
Kotlin
等语言都使用基于线程的协程,并没有 JavaScript
Python
式的基于回调与IO复用的协程。
等下,我刚刚是不是提到了 I/O 复用 ?
没错,这就是 Python 中网络回调的底层实现 select
模块与 selectors
模块,对异步操作提供了最基本的支持。
比如,这是一个简单的异步网络服务器:
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print('accepted', conn, 'from', addr)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read)
def read(conn, mask):
data = conn.recv(1000) # Should be ready
if data:
print('echoing', repr(data), 'to', conn)
conn.send(data) # Hope it won't block
else:
print('closing', conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, mask)
在 Windows 下基于 I/O Completion Port
的模型比基于 Selector
的模型更可拓展(支持子进程等),当然这是题外话了。
从回调到协程(下)🔗
上一节中,我们已经简单说了基于底层 selector
的回调协程,它都需要通过 socket
创建的原始套接字进行操作,非常笨拙且不友好。
回调的本质,是将一个任务分成两部分,在耗时操作之前的部分,和耗时操作之后的部分,后者是前者的继续,或者叫做续体(continuation)。回调式就是把续体写成回调函数的形式,传递到其他地方,这种操作又叫做 续体传递(continuation-passing) 。从这个层面上看,回调是续体传递风格(Continuation-Passing Style, CPS)的一种。
CPS 其实是很早就被研究过的东西,它最初的应用不是在异步,而是在函数式编程中。用 CPS 书写的 IR 可以方便地实现惰性求值,而且因为续体天生就是 Monad,所以顺便可以解决求值顺序的问题。(话说,为什么到处都是 Monad 呢?)上一句话大可不必理解,毕竟我们不是在讲函数式编程,不过是借用一下 CPS 的术语,提供一个新的视角而已。
从 CPS 的角度看,续体到底是什么形式并不重要,只要他能包含任务中尚未完成的部分就可以。于是我们有了一个大胆的想法:续体能不能是这个任务自身呢 ?
这是一个回调式的任务,它被拆成了两半:
def job1(url):
get_async(url, callback=continuation)
def continuation(response):
print(response.text.count('e'))
现在我们要把它拼回去:
from functools import partial
def job1(url, cont=False):
if not cont:
get_async(url, callback=partial(job1, cont=True))
else:
response = url
print(response.text.count('e'))
看起来怪怪的。这里用了一个参数 cont
来指示调用时进入的是任务的前半部分,还是后半部分。回调函数传入是就是这个函数本身,只是用 partial
规定了一下参数,让回调能进入续体部分。
这种奇怪的写法说不上好,可读性也不是很高。他只是把回调和任务本身强行拼在一起而已。
可是有一点优势,至少 在写代码的顺序上,他看起来和同步代码更像了 。这个优点说大也大,说小也小。如果一个函数中,要有很多次异步调用,如果一个一个全部拆分到回调函数里,就会显得特别杂乱(尤其是在 Python 的匿名函数特别丑陋的情况下)。如果我们能找到一个良好的写法,既能把破碎的回调函数拼回去,又能保持异步的优点,那就再好不过了。
问题的关键在哪里?上面这个函数写的很奇怪,原因是它要 实现同一个函数的两次调用执行不同的代码 。
……确实是个很奇怪的需求。不过换一个角度是不是就容易理解了呢?表面看是两次调用执行不同的代码,实际上是 第一次调用后,在某一处暂停,然后第二次调用,就从这个地方继续 。
如果有两个神秘的函数 pause
和 resume
,能让我们实现这一功能,我们立马可以把代码写得十分优雅:
def job1(url):
get_async(url, callback=resume)
response = pause()
print(response.text.count('e'))
执行到 pause
的时候,这个函数暂停,等到 resume
被调用时,才继续执行。
非常好,现在问题只有一个了:怎么才能实现暂停的功能呢?
答案就是:协程。
async 和 await🔗
按照最简单的方式来理解,协程就是可以暂停的函数。
如果你对 Python 的其他部分有一定了解,你一定会想到—— 生成器 (Generator)。
实际上,asyncio 刚刚被引入时,协程就是通过 @asyncio.coroutine
包装生成器而成的。
在 yield
时,函数暂停执行且保留本地变量,直到在其上执行 send()
或 next()
.
对于以下 Python 代码:
def generator():
yield "start"
return "end"
def main():
gen = generator()
start = next(gen)
try:
next(gen)
except StopIteration as e:
end = e.value
print(start, end)
main() # 输出 start end
它的执行流程如下 (箭头反映了控制权的交换):
flowchart TD;
subgraph main
A["gen = generator()"] --> B["start = next(gen)"]
C["next(gen)"]
D["end = e.value"]
D --> E["输出 start, end"]
end
subgraph gen
B --> Q["gen 执行到 yield,返回 'start'"]
Q -->|"返回的 'start' 赋值给 start"| C
C --> R["gen 执行到 return"] --> S["抛出 StopIteration"]
Q -->|"gen 挂起"| R
S -->|"返回的 'end' 赋值给 end"| D
end
Python 的协程就是使用了这个思路,只不过这样创建的协程,控制权不会直接移交给调用者,而是 事件循环 。
那协程怎么互相调用呢?没关系,通过 yield from
语法,可以进一步转交控制权给下层生成器,也就是协程。
没过多久 (Python 3.5),async
与 await
就被加入 Python 了。
使用协程🔗
通过 async
关键字可以将一个函数变为 定义上 的异步函数,通过 await
关键字调用其他的异步函数。
要在协程内部并发多个协程,我们需要 创建任务(Task) 。
我们可以通过 asyncio.create_task()
函数来创建一个任务。
有一个几乎等效的函数,叫做 asyncio.ensure_future()
。
等一下,Future
是什么?
Future
是用于表示 异步运算结果 的对象,它用于将回调式异步编程与 async
await
联系在一起。
通过 Future.result()
方法通知事件循环异步运算完成(无论成功还是失败),事件循环便会将结果传递给正在等待的协程。
Task
对象将协程包裹在 Future
中,用于控制协程执行。
再等等,Future
既然可以表示异步运算结果,那么......
通过 AbstractEventLoop.run_in_executor()
,可以将同步函数在线程/进程内执行,再包装成可以被等待的 Future
对象。
async for
async with
则是通过实现 __aiter__()
__anext__()
__aenter__()
__aexit__()
等 dunder 方法实现的。