Python异步io

  1. 概念与历史

编程中,我们经常会遇到“并发”这个概念,目的是让软件能充分利用硬件资源,提高性能。并发的方式有多种,多线程,多进程,异步IO等。多线程和多进程更多应用于CPU密集型的场景,比如科学计算的时间都耗费在CPU上,利用多核CPU来分担计算任务。多线程和多进程之间的场景切换和通讯代价很高,不适合IO密集型的场景(关于多线程和多进程的特点已经超出本文讨论的范畴,有兴趣的同学可以自行搜索深入理解)。而异步IO就是非常适合IO密集型的场景,比如网络爬虫和Web服务。

在计算机程序中,IO就是读写磁盘、读写网络的操作,这种读写速度比读写内存、CPU缓存慢得多,前者的耗时是后者的成千上万倍甚至更多。这就导致,IO密集型的场景99%以上的时间都花费在IO等待的时间上。异步IO就是把CPU从漫长的等待中解放出来的方法。这就可以大大提高我们写的软件系统的并发性。这样的软件,可以是网络爬虫,也可以是Web服务等一切IO密集型的系统。

异步IO的优势显而易见,各种语言都通过实现这个机制来提高自身的效率,Python也不例外。Python经历了2和3两个大版本的跃迁。这其中也有对异步IO支持的变化历程。

Python 2的异步IO库

Python 2 时代官方并没有异步IO的支持,但是有几个第三方库通过事件或事件循环(Event Loop)实现了异步IO,它们是:

  • twisted: 是事件驱动的网络库
  • gevent: greenlet + libevent(后来是libev或libuv)。通过协程(greenlet)和事件循环库(libev,libuv)实现的gevent使用很广泛。
  • tornado: 支持异步IO的web框架。自己实现了IOLOOP。

Python 3 官方的异步IO

Python 3.4 加入了asyncio 库,使得Python有了支持异步IO的官方库。这个库,底层是事件循环(EventLoop),上层是协程和任务。asyncio自从3.4 版本加入到最新的 3.7版一直在改进中。

Python 3.4 刚开始的asyncio的协程还是基于生成器的,通过 yield from 语法实现,可以通过装饰器 @asyncio.coroutine (已过时)装饰一个函数来定义一个协程。比如:

1
2
3
4
5
6
7
8
9
10
11
import asyncio

@asyncio.coroutine
def as_test():
print("this is a asyncio_test")

loop = asyncio.get_event_loop()

# 等待as_test()这个协程结束后,阻塞调用run_until_complete()才返回
loop.run_until_complete(as_test())
loop.close()

Python 3.5 引入了两个新的关键字 await 和 async 用来替换 @asyncio.coroutine 和 yield from ,从语言本身来支持异步IO。从而使得异步编程更加简洁,并和普通的生成器区别开来。

注意: 对基于生成器的协程的支持已弃用,并计划在 Python 3.10 中移除。所以,写异步IO程序时只需使用 async 和 await 即可。

Python 3.7 又进行了优化,把API分组为高层级API和低层级API。 我们先看看下面的代码,发现与上面的有什么不同?

1
2
3
4
5
6
7
8
9
10
import asyncio

async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')

# Python 3.7+
#await main()
asyncio.run(main())
---------------------------------------------------------------------------

RuntimeError                              Traceback (most recent call last)

<ipython-input-6-32d3682971b6> in <module>
      8 # Python 3.7+
      9 #await main()
---> 10 asyncio.run(main())


~\Anaconda3\lib\asyncio\runners.py in run(main, debug)
     32     if events._get_running_loop() is not None:
     33         raise RuntimeError(
---> 34             "asyncio.run() cannot be called from a running event loop")
     35 
     36     if not coroutines.iscoroutine(main):


RuntimeError: asyncio.run() cannot be called from a running event loop
1
2
3
4
5
6
7
8
9
10
import asyncio

async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')

# Python 3.7+
await main()
#asyncio.run(main())
Hello ...
... World!

上面问题原因

https://stackoverflow.com/questions/55409641/asyncio-run-cannot-be-called-from-a-running-event-loop

该asyncio.run()文档说:

当另一个asyncio事件循环在同一个线程中运行时,无法调用此函数。

你的情况的问题是,jupyter(IPython中)已经运行的事件循环(对于IPython的≥7.0):

您现在可以在IPython终端和笔记本中的顶层使用async / await,它应该 - 在大多数情况下 - “正常工作”。将IPython更新到版本7+,将IPykernel更新到版本5+,然后您即可参加比赛。

这就是为什么你不需要在jupyter中自己启动事件循环而你可以直接调用的原因await main(url)。

image-20200427180401724

除了用 async 替换 @asyncio.coroutine 和用 await 替换 yield from 外,最大的变化就是关于eventloop的代码不见了,只有一个 async.run()。这就是 3.7 的改进,把eventloop相关的API归入到低层级API,新引进run()作为高层级API让写应用程序的开发者调用,而不用再关心eventloop。除非你要写异步库(比如MySQL异步库)才会和eventloop打交道。

需要注意的是, asyncio.run() 是3.7版新增加的,(所以Python环境要注意),处于暂定API状态。 暂定API,是指被有意排除在标准库的向后兼容性保证之外的应用编程接口。虽然此类接口通常不会再有重大改变,但只要其被标记为暂定,就可能在核心开发者确定有必要的情况下进行向后不兼容的更改(甚至包括移除该接口)。此种更改并不会随意进行 — 仅在 API 被加入之前未考虑到的严重基础性缺陷被发现时才可能会这样做。即便是对暂定 API 来说,向后不兼容的更改也会被视为“最后的解决方案” —— 任何问题被确认时都会尽可能先尝试找到一种向后兼容的解决方案。这种处理过程允许标准库持续不断地演进,不至于被有问题的长期性设计缺陷所困。

从上面关于 asyncio 的发展来看它一直在变化,3.4,3.5,3.6, 3.7 都有很多细节上的变化。当我看到3.7的run()函数时,也发现一年前基于3.6的asnycio写的爬虫不那么优雅了。

这种变化,一方面改善了asyncio本身的性能和使用方便程度,但另一方面也增加了我们使用者的学习成本、Python升级带来的改造的成本。如果你以消极的态度抵制这种变化,可以去学习golang,C++来实现你的程序;如果你以积极的态度迎接这种变化,可以更快的掌握这种变化,并优雅 高效的实现你的程序。

只要你喜欢用Python写程序解决问题,那么就接受并掌握这种变化吧。其实,那种语言不在变,那种技术不在前进。作为程序员,你只有不断地学习和前进。

uvloop

uvloop是用Cython写的,基于libuv这个C语言实现的高性能异步I/O库。asyncio自己的事件循环是用Python写的,用uvloop替换asyncio自己的事件循环可以使asyncio的速度更快。并且使用相当简洁:

1
2
3
4
import asyncio
import uvloop

asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
---------------------------------------------------------------------------

ModuleNotFoundError                       Traceback (most recent call last)

<ipython-input-5-f19f880fc357> in <module>
      1 import asyncio
----> 2 import uvloop
      3 
      4 asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())


ModuleNotFoundError: No module named 'uvloop'

如果Windows安装uvloop会出错:

uvloop does not support Windows at the moment

Python的异步IO:API

https://docs.python.org/zh-cn/3/library/asyncio.html?highlight=asyncio#module-asyncio

Python的asyncio是使用 async/await 语法编写并发代码的标准库。通过上一部分的讲解,我们了解了它不断变化的发展历史。到了Python最新稳定版 3.7 这个版本,asyncio又做了比较大的调整,把这个库的API分为了 高层级API和低层级API,并引入asyncio.run()这样的高级方法,让编写异步程序更加简洁。

本节希望提纲挈领地介绍最新 3.7 版的asnycio,先从全局认识Python这个异步IO库。

asyncio的高层级API主要提高如下几个方面:

  • 并发地运行Python协程并完全控制其执行过程;
  • 执行网络IO和IPC;
  • 控制子进程;
  • 通过队列实现分布式任务;
  • 同步并发代码。

asyncio的低层级API用以支持开发异步库和框架:

  • 创建和管理事件循环(event loop),提供异步的API用于网络,运行子进程,处理操作系统信号等;
  • 通过transports实现高效率协议;
  • 通过async/await 语法桥架基于回调的库和代码。

asyncio高级API

高层级API让我们更方便的编写基于asyncio的应用程序。这些API包括:

(1)协程和任务

协程通过 async/await 语法进行声明,是编写异步应用的推荐方式。历史的 @asyncio.coroutineyield from 已经被弃用,并计划在Python 3.10中移除。协程可以通过 asyncio.run(coro, *, debug=False) 函数运行,该函数负责管理事件循环并完结异步生成器。它应该被用作asyncio程序的主入口点,相当于main函数,应该只被调用一次。

任务被用于并发调度协程,可用于网络爬虫的并发。使用 asyncio.create_task() 就可以把一个协程打包为一个任务,该协程会自动安排为很快运行。

协程,任务和Future都是可等待对象。其中,Future是低层级的可等待对象,表示一个异步操作的最终结果

(2)流

流是用于网络连接的高层级的使用 async/await的原语。流允许在不使用回调或低层级协议和传输的情况下发送和接收数据。异步读写TCP有客户端函数 asyncio.open_connection() 和 服务端函数 asyncio.start_server() 。它还支持 Unix Sockets: asyncio.open_unix_connection()asyncio.start_unix_server()

(3)同步原语

asyncio同步原语的设计类似于threading模块的原语,有两个重要的注意事项:

  • asyncio原语不是线程安全的,因此它们不应该用于OS线程同步(而是用threading)
  • 这些同步原语的方法不接受超时参数; 使用asyncio.wait_for()函数执行超时操作。

asyncio具有以下基本同步原语:

  • Lock
  • Event
  • Condition
  • Semaphore
  • BoundedSemaphore

(4)子进程

asyncio提供了通过 async/await 创建和管理子进程的API。不同于Python标准库的subprocess,asyncio的子进程函数都是异步的,并且提供了多种工具来处理这些函数,这就很容易并行执行和监视多个子进程。创建子进程的方法主要有两个:

  • coroutine asyncio.create_subprocess_exec()
  • coroutine asyncio.create_subprocess_shell()

(5)队列

asyncio 队列的设计类似于标准模块queue的类。虽然asyncio队列不是线程安全的,但它们被设计为专门用于 async/await 代码。需要注意的是,asyncio队列的方法没有超时参数,使用 asyncio.wait_for()函数进行超时的队列操作。

因为和标注模块queue的类设计相似,使用起来跟queue无太多差异,只需要在对应的函数前面加 await 即可。asyncio 队列提供了三种不同的队列:

  • class asyncio.Queue 先进先出队列
  • class asyncio.PriorityQueue 优先队列
  • class asyncio.LifoQueue 后进先出队列

(6)异常

asyncio提供了几种异常,它们是:

  • TimeoutError,
  • CancelledError,
  • InvalidStateError,
  • SendfileNotAvailableError
  • IncompleteReadError
  • LimitOverrunError

asyncio低级API

低层级API为编写基于asyncio的库和框架提供支持,有意编写异步库和框架的大牛们需要熟悉这些低层级API。主要包括:

(1)事件循环

事件循环是每个asyncio应用程序的核心。 事件循环运行异步任务和回调,执行网络IO操作以及运行子进程。

应用程序开发人员通常应该使用高级asyncio函数,例如asyncio.run(),并且很少需要引用循环对象或调用其方法。

Python 3.7 新增了 asyncio.get_running_loop()函数。

(2)Futures

Future对象用于将基于低层级回调的代码与高层级的 async/await 代码进行桥接。

Future表示异步操作的最终结果。 不是线程安全的。

Future是一个可等待对象。 协程可以等待Future对象,直到它们有结果或异常集,或者直到它们被取消。

通常,Futures用于启用基于低层级回调的代码(例如,在使用asyncio传输实现的协议中)以与高层级 async/await 代码进行互操作。

(3)传输和协议(Transports和Protocols)

Transport 和 Protocol由低层级事件循环使用,比如函数loop.create_connection()。它们使用基于回调的编程风格,并支持网络或IPC协议(如HTTP)的高性能实现。

在最高级别,传输涉及字节的传输方式,而协议确定要传输哪些字节(在某种程度上何时传输)。

换种方式说就是:传输是套接字(或类似的I/O端点)的抽象,而协议是从传输的角度来看的应用程序的抽象。

另一种观点是传输和协议接口共同定义了一个使用网络I/O和进程间I/O的抽象接口。

传输和协议对象之间始终存在1:1的关系:协议调用传输方法来发送数据,而传输调用协议方法来传递已接收的数据。

大多数面向连接的事件循环方法(例如loop.create_connection())通常接受protocol_factory参数,该参数用于为接受的连接创建Protocol对象,由Transport对象表示。 这些方法通常返回(传输,协议)元组。

(4)策略(Policy)

事件循环策略是一个全局的按进程划分的对象,用于控制事件循环的管理。 每个事件循环都有一个默认策略,可以使用策略API对其进行更改和自定义。

策略定义了上下文的概念,并根据上下文管理单独的事件循环。 默认策略将上下文定义为当前线程。

通过使用自定义事件循环策略,可以自定义get_event_loop()set_event_loop()new_event_loop()函数的行为。

(5)平台支持

asyncio模块设计为可移植的,但由于平台的底层架构和功能,某些平台存在细微的差异和限制。在Windows平台,有些是不支持的,比如 loop.create_unix_connection() and loop.create_unix_server()。而Linux和比较新的macOS全部支持。

总结

Python 3.7 通过对asyncio分组使得它的架构更加清晰,普通写异步IO的应用程序只需熟悉高层级API,需要写异步IO的库和框架时才需要理解低层级的API。

Python的异步IO编程例子

本部分我们讲以Python 3.7 + 的asyncio为例讲解如何使用Python的异步IO。

创建第一个协程

Python 3.7 推荐使用 async/await 语法来声明协程,来编写异步应用程序。我们来创建第一个协程函数:首先打印一行“你好”,等待1秒钟后再打印“已经创建一个协程”。

1
2
3
4
5
6
7
8
9
10
11
12
13
import asyncio

async def build_async():
print('你好')
await asyncio.sleep(1)
print("已经创建一个协程")

# asyncio.run(build_async())
'''
再次声明,这只是jupyter的写法,原因在上面解释过了,不能运行会报错
正常是上面注释语句 asyncio.run(build_async())
'''
await(build_async())
你好
已经创建一个协程

build_async()函数通过 async 声明为协程函数,较之前的修饰器声明更简洁明了。

在实践过程中,什么功能的函数要用async声明为协程函数呢?就是那些能发挥异步IO性能的函数,比如读写文件、读写网络、读写数据库,这些都是浪费时间的IO操作,把它们协程化、异步化从而提高程序的整体效率(速度)。

build_async()函数是通过 asyncio.run()来运行的,而不是直接调用这个函数(协程)。因为,直接调用并不会把它加入调度日程,而只是简单的返回一个协程对象:

1
build_async()
<coroutine object async-def-wrapper.<locals>.build_async at 0x000001EE11AA5348>

那么,如何真正运行一个协程呢?asyncio 提供了三种机制:

(1)asyncio.run() 函数,这是异步程序的主入口,相当于C语言中的main函数。

(2)用await等待协程,比如上例中的 await asyncio.sleep(1) 。再看下面的例子,我们定义了协程 async_await_demo() ,在main()协程中调用两次,第一次延迟1秒后打印“你好”,第二次延迟2秒后打印“学习 python 异步协程”。这样我们通过 await 运行了两个协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import asyncio
import time

async def async_await_demo(msg,delay_time):
await asyncio.sleep(delay_time)
print(msg)

async def main():
print(f"begin at {time.strftime('%H:%M:%S')}")
await async_await_demo("你好",1)
await async_await_demo("学习 python 异步协程",2)
print(f"end at {time.strftime('%H:%M:%S')}")

# asyncio.run(main())
await(main())
begin at 11:24:08
你好
学习 python 异步协程
end at 11:24:11

从起止时间可以看出,两个协程是顺序执行的,总共耗时1+2(11-8)=3秒。

Python time strftime() 函数接收以时间元组,并返回以可读字符串表示的当地时间,格式由参数format决定。

https://docs.python.org/zh-cn/3/library/time.html#time.strptime

(3)通过 asyncio.create_task() 函数并发运行作为 asyncio 任务(Task) 的多个协程。下面,我们用create_task()来修改上面的main()协程,从而让两个say_delay()协程并发运行:

1
2
3
4
5
6
7
8
9
10
async def main():
task1 = asyncio.create_task(async_await_demo("你好",1))
task2 = asyncio.create_task(async_await_demo("学习 python 异步协程",2))
print(f"begin at {time.strftime('%X')}")
await task1
await task2
print(f"end at {time.strftime('%X')}")

# asyncio.run(main())
await(main())
begin at 11:35:51
你好
学习 python 异步协程
end at 11:35:53

从运行结果的起止时间可以看出,两个协程是并发执行的了,总耗时等于最大耗时2秒。

asyncio.create_task() 是一个很有用的函数,在爬虫中它可以帮助我们实现大量并发去下载网页。在Python 3.6中与它对应的是 ensure_future()

可等待对象(awaitables)

可等待对象,就是可以在 await 表达式中使用的对象,前面我们已经接触了两种可等待对象的类型:协程和任务,还有一个是低层级的Future。

asyncio模块的许多API都需要传入可等待对象,比如 run(), create_task() 等等。

(1)协程

协程是可等待对象,可以在其它协程中被等待。协程两个紧密相关的概念是:

  • 协程函数:通过 async def 定义的函数;
  • 协程对象:调用协程函数返回的对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio
import time

async def whattime():
return time.time()

async def main():
# 直接调用协程函数,返回的是协程对象
co = whattime()
print("co is " , type(co))

now = await co
print(f"now is {now}")

now2 = await whattime()
print(f"now is {now2}")

# asyncio.run(main())
await(main())
co is  <class 'coroutine'>
now is 1562643752.6321583
now is 1562643752.6331575

可以看到,直接运行协程函数 whattime()得到的co是一个协程对象,因为协程对象是可等待的,所以通过 await 得到真正的当前时间。now2是直接await 协程函数,也得到了当前时间的返回值。

(2)任务

前面我们讲到,任务是用来调度协程的,以便并发执行协程。当一个协程通过 asyncio.create_task() 被打包为一个 任务,该协程将自动加入程序调度日程准备立即运行。

create_task()的基本使用前面例子已经讲过。它返回的task通过await来等待其运行完。如果,我们不等待,会发生什么?“准备立即运行”又该如何理解呢?先看看下面这个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio
import time

async def whattime(i):
await asyncio.sleep(1)
print(f'calling:{i},now is {time.strftime("%X")}')

async def main():
task = asyncio.create_task(whattime(0))
await task
for i in range(1,5):
asyncio.create_task(whattime(i))

await asyncio.sleep(1)

# asyncio.run(main())
await(main())
calling:0,now is 12:22:58
calling:2,now is 12:22:59
calling:4,now is 12:22:59
calling:1,now is 12:22:59
calling:3,now is 12:22:59

运行这段代码的情况是这样的:

首先,1秒钟后打印一行,这是第9,10行代码运行的结果:

calling:0,now is 12:22:58

接着,停顿1秒后,连续打印4行:

calling:2,now is 12:22:59 calling:4,now is 12:22:59 calling:1,now is 12:22:59 calling:3,now is 12:22:59

从这个结果看,asyncio.create_task()产生的4个任务,我们并没有await,它们也执行了。关键在于第14行的 await,如果把这一行去掉或是sleep的时间小于1秒(比whattime()里面的sleep时间少即可),就会只看到第一行的输出结果而看不到后面四行的输出。这是因为,main()不sleep或sleep少于1秒钟,main()就在whattime()还未来得及打印结果(因为,它要sleep 1秒)就退出了,从而整个程序也退出了,就没有whattime()的输出结果。

再来理解一下“准备立即执行”这个说法。它的意思就是,create_task()只是打包了协程并加入调度队列还未执行,并准备立即执行,什么时候执行呢?在“主协程”(调用create_task()的协程)挂起的时候,这里的“挂起”有两个方式:

  • 一是,通过 await task 来执行这个任务;
  • 另一个是,主协程通过 await sleep 挂起,事件循环就去执行task了。

我们知道,asyncio是通过事件循环实现异步的。在主协程 main()里面,没有遇到 await 时,事件就是执行main()函数,遇到 await 时,事件循环就去执行别的协程,即create_task()生成的whattime()的4个任务,这些任务一开始就是 await sleep 1秒。这时候,主协程和4个任务协程都挂起了,CPU空闲,事件循环等待协程的消息。

如果main()协程只sleep了0.1秒,它就先醒了,给事件循环发消息,事件循环就来继续执行main()协程,而main()后面已经没有代码,就退出该协程,退出它也就意味着整个程序退出,4个任务就没机会打印结果;

如果main()协程sleep时间多余1秒,那么4个任务先唤醒,就会得到全部的打印结果;

如果main()的14行sleep等于1秒时,和4个任务的sleep时间相同,也会得到全部打印结果。这是为什么呢?

我猜想是这样的:4个任务生成在前,第14行的sleep在后,事件循环的消息响应可能有个先进先出的顺序。后面深入asyncio的代码专门研究一下这个猜想正确与否。

(3)Future

它是一个低层级的可等待对象,表示一个异步操作的最终结果。目前,我们写应用程序还用不到它,暂不学习。

如果想要学习,可以进入官网查看详细 https://docs.python.org/zh-cn/3/library/asyncio-future.html

asyncio异步IO协程总结

协程就是我们异步操作的片段。通常,写程序都会把全部功能分成很多不同功能的函数,目的是为了结构清晰;进一步,把那些涉及耗费时间的IO操作(读写文件、数据库、网络)的函数通过 async def 异步化,就是异步编程。

那些异步函数(协程函数)都是通过消息机制被事件循环管理调度着,整个程序的执行是单线程的,但是某个协程A进行IO时,事件循环就去执行其它协程非IO的代码。当事件循环收到协程A结束IO的消息时,就又回来执行协程A,这样事件循环不断在协程之间转换,充分利用了IO的闲置时间,从而并发的进行多个IO操作,这就是异步IO。

写异步IO程序时记住一个准则:需要IO的地方异步。其它地方即使用了协程函数也是没用的。

参考链接

严格声明

本项目涉及的代码及一系列分析过程,仅用于学习交流,切勿用于任何非法用途,后果自当。

未经本人同意,禁止以任何形式,在任何渠道,私自转发或盗文


!!! 违者必追究 ,后果自负 !!!