最新Python异步编程详解

我们都知道对于I/O相关的程序来说,异步编程可以大幅度的提高系统的吞吐量,因为在某个I/O操作的读写过程中,系统可以先去处理其它的操作(通常是其它的I/O操作),那么Python中是如何实现异步编程的呢?

简单的回答是Python通过协程(coroutine)来实现异步编程。那究竟啥是协程呢?这将是一个很长的故事。 故事要从yield开始说起(已经熟悉yield的读者可以跳过这一节)。

yield

yield是用来生成一个生成器的(Generator), 生成器又是什么呢?这又是一个长长的story,所以这次我建议您移步到这里: 完全理解Python迭代对象、迭代器、生成器,而关于yield是怎么回事,建议看这里:[翻译]PYTHON中YIELD的解释

好了,现在假设你已经明白了yield和generator的概念了,请原谅我这种不负责任的说法但是这真的是一个很长的story啊!

总的来说,yield相当于return,它将相应的值返回给调用next()或者send()的调用者,从而交出了cpu使用权,而当调用者再调用next()或者send()时,又会返回到yield中断的地方,如果send有参数,又会将参数返回给yield赋值的变量,如果没有就跟next()一样赋值为None。但是这里会遇到一个问题,就是嵌套使用generator时外层的generator需要写大量代码,看如下示例:

注意以下代码均在Python3.6上运行调试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#!/usr/bin/env python
# encoding:utf-8


def inner_generator():
i = 0
while True:
i = yield i
if i > 10:
raise StopIteration



def outer_generator():
print("do something before yield")
from_inner = 0
from_outer = 1
g = inner_generator()
g.send(None)
while 1:
try:
from_inner = g.send(from_outer)
from_outer = yield from_inner
except StopIteration:
break


def main():
g = outer_generator()
g.send(None)
i = 0
while 1:
try:
i = g.send(i + 1)
print(i)
except StopIteration:
break


if __name__ == '__main__':
main()

为了简化,在Python3.3中引入了yield from

yield from

使用yield from有两个好处,

  1. 可以将main中send的参数一直返回给最里层的generator,
  2. 同时我们也不需要再使用while循环和send (), next()来进行迭代。

我们可以将上边的代码修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def inner_generator():
i = 0
while True:
i = yield i
if i > 10:
raise StopIteration


def outer_generator():
print("do something before coroutine start")
yield from inner_generator()


def main():
g = outer_generator()
g.send(None)
i = 0
while 1:
try:
i = g.send(i + 1)
print(i)
except StopIteration:
break

if __name__ == '__main__':
main()

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
do something before coroutine start
1
2
3
4
5
6
7
8
9
10

这里inner_generator()中执行的代码片段我们实际就可以认为是协程,所以总的来说逻辑图如下:

coroutine and wrapper

接下来我们就看下究竟协程是啥样子

协程coroutine

协程的概念应该是从进程和线程演变而来的,他们都是独立的执行一段代码,但是不同是线程比进程要轻量级,协程比线程还要轻量级。多线程在同一个进程中执行,而协程通常也是在一个线程当中执行。它们的关系图如下:

process, thread and coroutine

我们都知道Python由于GIL(Global Interpreter Lock)原因,其线程效率并不高,并且在*nix系统中,创建线程的开销并不比进程小,因此在并发操作时,多线程的效率还是受到了很大制约的。所以后来人们发现通过yield来中断代码片段的执行,同时交出了cpu的使用权,于是协程的概念产生了。在Python3.4正式引入了协程的概念,代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

# Borrowed from http://curio.readthedocs.org/en/latest/tutorial.html.
@asyncio.coroutine
def countdown(number, n):
while n > 0:
print('T-minus', n, '({})'.format(number))
yield from asyncio.sleep(1)
n -= 1

loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(countdown("A", 2)),
asyncio.ensure_future(countdown("B", 3))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

示例显示了在Python3.4引入两个重要概念协程事件循环, 通过修饰符@asyncio.coroutine定义了一个协程,而通过event loop来执行tasks中所有的协程任务。之后在Python3.5引入了新的async & await语法,从而有了原生协程的概念。

async & await

在Python3.5中,引入了aync&await 语法结构,通过"aync def"可以定义一个协程代码片段,作用类似于Python3.4中的@asyncio.coroutine修饰符,而await则相当于"yield from"。

先来看一段代码,这个是我刚开始使用async&await语法时,写的一段小程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
# encoding:utf-8

import asyncio
import requests
import time


async def wait_download(url):
response = await requests.get(url)
print("get {} response complete.".format(url))


async def main():
start = time.time()
await asyncio.wait([
wait_download("http://www.163.com"),
wait_download("http://www.mi.com"),
wait_download("http://www.google.com")])
end = time.time()
print("Complete in {} seconds".format(end - start))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

这里会收到这样的报错:

1
2
3
4
5
6
Task exception was never retrieved
future: <Task finished coro=<wait_download() done, defined at asynctest.py:9> exception=TypeError("object Response can't be used in 'await' expression",)>
Traceback (most recent call last):
File "asynctest.py", line 10, in wait_download
data = await requests.get(url)
TypeError: object Response can't be used in 'await' expression

这是由于requests.get()函数返回的Response对象不能用于await表达式,可是如果不能用于await,还怎么样来实现异步呢? 原来Python的await表达式是类似于"yield from"的东西,但是await会去做参数检查,它要求await表达式中的对象必须是awaitable的,那啥是awaitable呢? awaitable对象必须满足如下条件中其中之一:

  • A native coroutine object returned from a native coroutine function .

    原生协程对象

  • A generator-based coroutine object returned from a function decorated with types.coroutine() .

    types.coroutine()修饰的基于生成器的协程对象,注意不是Python3.4中asyncio.coroutine

  • An object with an await method returning an iterator.

    实现了__await__ method,并在其中返回了iterator的对象

根据这些条件定义,我们可以修改代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env python
# encoding:utf-8

import asyncio
import requests
import time


async def download(url): # 通过async def定义的函数是原生的协程对象
print("get %s" % url)
response = requests.get(url)
print(response.status_code)


async def wait_download(url):
await download(url) # 这里download(url)就是一个原生的协程对象
print("get {} data complete.".format(url))


async def main():
start = time.time()
await asyncio.wait([
wait_download("http://www.163.com"),
wait_download("http://www.mi.com"),
wait_download("http://www.baidu.com")])
end = time.time()
print("Complete in {} seconds".format(end - start))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

至此,程序可以运行,不过仍然有一个问题就是它并没有真正地异步执行 看一下运行结果:

1
2
3
4
5
6
7
8
9
10
get http://www.163.com
200
get http://www.163.com data complete.
get http://www.baidu.com
200
get http://www.baidu.com data complete.
get http://www.mi.com
200
get http://www.mi.com data complete.
Complete in 0.49027466773986816 seconds

会发现程序始终是同步执行的,这就说明仅仅是把涉及I/O操作的代码封装到async当中是不能实现异步执行的。必须使用支持异步操作的非阻塞代码才能实现真正的异步。目前支持非阻塞异步I/O的库是aiohttp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/env python
# encoding:utf-8
import asyncio
import aiohttp
import time


async def download(url): # 通过async def定义的函数是原生的协程对象
print("get: %s" % url)
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(resp.status)
# response = await resp.read()

# 此处的封装不再需要
# async def wait_download(url):
# await download(url) # 这里download(url)就是一个原生的协程对象
# print("get {} data complete.".format(url))


async def main():
start = time.time()
await asyncio.wait([
download("http://www.163.com"),
download("http://www.mi.com"),
download("http://www.baidu.com")])
end = time.time()
print("Complete in {} seconds".format(end - start))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

再看一下测试结果:

1
2
3
4
5
6
7
get: http://www.mi.com
get: http://www.163.com
get: http://www.baidu.com
200
200
200
Complete in 0.27292490005493164 seconds

可以看出这次是真正的异步了。 好了现在一个真正的实现了异步编程的小程序终于诞生了。 而目前更牛逼的异步是使用uvloop或者pyuv,这两个最新的Python库都是libuv实现的,可以提供更加高效的event loop。

uvloop和pyuv

关于uvloop可以参考uvloop pyuv可以参考这里pyuv

pyuv实现了Python2.x和3.x,但是该项目在github上已经许久没有更新了,不知道是否还有人在维护。 uvloop只实现了3.x, 但是该项目在github上始终活跃。

它们的使用也非常简单,以uvloop为例,只需要添加以下代码就可以了

1
2
3
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

关于Python异步编程到这里就告一段落了,而引出这篇文章的引子实际是关于网上有关Sanic和uvloop的组合创造的惊人的性能,感兴趣的同学可以找下相关文章,也许后续我会再专门就此话题写一篇文章,欢迎交流!