说明
《Python 教程》 持续更新中,提供建议、纠错、催更等加作者微信: gr99123(备注:pandas教程)和关注公众号「盖若」ID: gairuo。跟作者学习,请进入 Python学习课程。欢迎关注作者出版的书籍:《深入浅出Pandas》 和 《Python之光》。
以进程、线程、协程、函数/方法作为执行任务程序的基本单位,结合回调、事件循环、信号量等机制,以提高程序整体执行效率和并发能力的编程方式就是异步编程。
除了顺序执行和并行执行的模型之外,还有第三种模型,叫做异步模型,这是事件驱动模型的基础。异步活动的执行模型可以只有一个单一的主控制流,能在单核心系统和多核心系统中运行。
在并发执行的异步模型中,许多任务被穿插在同一时间线上,所有的任务都由一个控制流执行(单一线程)。任务的执行可能被暂停或恢复,中间的这段时间线程将会去执行其他任务。
仅仅说异步的话,基本上不关底层什么事,实现思路很简单,就是在调用时不返回有效的值、而是返回一个称为“future”的用来存储结果的容器,在所请求的数据准备完成后自动写入future中。异步不仅仅可以基于协程,也可以基于线程、进程。
Python 每个函数调用都有自己的堆栈,当这个函数调用完后,这个堆栈就释放了,当遇到yield这样子的关键字的时候,便会暂停当前堆栈,然后退回到上一堆栈中,等待下一次进入,生成器是这样实现的。异步的话select和yield的组合形式,即每当select监听到一个事件的时候,便利用yield将程序的控制权释放,以便执行用户定义的程序,而每当用户yield的时候,程序会继续监听事件,如此循环async await 便是这种形式的语法糖,方便用户编写程序。
python协程的发展时间较长:
asyncio 解决的是异步 IO 编程中的所有问题,它提供了一整套解决方案,不仅仅是协程方面的问题。包括:
由于 asyncio 每个版本都会新增功能,对一些旧的底层的API进行封装,极大地方便的使用者,但正因为此,网上有很多教程使用的接口官方已经不建议直接使用,应该改而使用更加高级的 API,所以在这里记录一下如何使用这些API。
同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行。
异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。
例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。简言之,异步意味着无序。
阻塞是指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。常见的阻塞形式有:网络I/O阻塞、磁盘I/O阻塞、用户输入阻塞等。阻塞是无处不在的,包括CPU切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。(如果是多核CPU则正在执行上下文切换操作的核不可被利用。)
程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。非阻塞并不是在任何程序级别、任何情况下都可以存在的。仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。
一般来说,长耗时,消耗大量资源,或者容易出错的逻辑,非常适合从请求主流程中剥离出来,异步执行。例如新用户注册,注册成功后,系统通常会发送一封欢迎邮件。发送欢迎邮件的动作就可以从注册流程中剥离出来。另一个例子是用户上传图片,图片上传后通常需要生成不同大小的缩略图。但图片处理的过程不必包含在图片上传处理流程中,用户上传图片成功后就可以结束流程,生成缩略图等处理逻辑可以作为异步任务执行。这样应用服务器避免被图片处理等计算密集型任务压垮,用户也能更快的得到响应。常见的异步执行任务包括:
在同步世界中,我们习惯于线性思考。如果我们有一系列需要不同时间的任务,它们将按照调用顺序执行。然而,在使用并发时,我们需要知道任务的完成顺序与计划的不同。
Event loop 是 asyncio 模块的的核心机制,用它来实现异步 (asynchronous) 的执行工作,事件循环运行异步任务和回调,执行网络IO操作,并运行子流程。否则所有的异步任务都在无法执行和收到执行结果。
实际上 event loop 是一个在背后执行的线程(thread),它是一个循环,不断地循环进行调试和执行协程,以及回调(callbacks),它十分适合将 I/O 类的工作以非同步方式交由 event loop 执行,例如网路通讯、档案读写等等,以利 event loop 进行工作切换。
原则上,我们不需针对 event loop 进行太多操作与干涉, Python 已经将 event loop 进行封装,如果有需要的话,可以使用 asyncio.get_event_loop()
即可取得 event loop 实例(instance) 以进行操作。
常用的事件循环方法有:
# 创建并返回一个新的事件循环对象
loop = asyncio.new_event_loop()
# 运行直到 future ( Future 的实例 ) 被完成,如果是 coroutine object
# 将被隐式调度为 asyncio.Task 来运行,返回 Future 的结果
loop.run_until_complete(main())
todo
如果一个对象可以在 await 语句中使用,那么它就是 可等待 对象。许多内置模块 asyncio API 都被设计为接受可等待对象。可等待 对象有三种主要类型:
asyncio.Future
(有称未来、期程的,无统一翻译)asyncio.Task
(一种 Future,它的子类)可等待对象(awaitable object)通常实现一个 __await__()
特殊方法,它必须返回一个迭代器。协程对象(Coroutine objects)通过 async def
定义的函数返回的是可等待对象。
但是,生成器对象通过 types.coroutine()
或者 asyncio.coroutine()
返回的迭代器也是可等待对象, 但它们并没有实现 __await__()
。
协程函数(coroutine function)是指返回一个 coroutine 对象的函数。协程函数可通过 async def
语句来定义,并可能包含 await、async for 和 async with 关键字。这些特性是由 PEP 492 引入的。协程是一种更通用的子程序形式。子程序在一点输入,在另一点退出。可以在许多不同的点进入、退出和恢复协程。
协程(coroutine)包括两个概念:
@asyncio.coroutine
装饰器操作将 Python 3.10 后废弃不建议使用调用协程函数(async def)不会运行它,它返回一个协同程序对象,就像生成器函数返回生成器对象一样。await 关键字可以从协程中获取返回值,即调用协程。如,下例不会执行 main() 会抛出警告:
import asyncio
async def main():
await asyncio.sleep(1)
print('hello')
main()
'''
test.py:7: RuntimeWarning: coroutine 'main'
was never awaited
main()
RuntimeWarning: Enable tracemalloc to
get the object allocation traceback
'''
它的类型是一个协程:
coro = main()
print(type(coro))
# <class 'coroutine'>
要运行需要用 asyncio 内置模块的 asyncio.run()
方法,run() 使用将协程放入事件循环来执行并返回结果:
asyncio.run(coro)
# hello
当协程中的可等待对象不使用 await 关键字唤醒时,会抛出警告。如上例中的 asyncio.sleep(1)
前不加 await 时将报 RuntimeWarning: coroutine 'sleep' was never awaited
警告信息。
简而言之, coroutine 具有开始(enter)/暂停(exit)以及任意恢复(resume)执行的能力,譬如发出 HTTP GET 要求之后,就暂停执行,转而执行其他工作,等到该 HTTP GET 要求收到响应之后,再转回来恢复执行剩下的工作。
另外 await 语法则是被用来告知 Python 可以在此处暂停执行 coroutine 转而执行其他工作,而且该语法只能在 coroutine 数据内使用,因此 async def 与 await 通常会一起出现。
await 语法的另外重点是 await 之后只能接 awaitables 可等待对象,例如 coroutine 或者是之后会介绍到的 Task, Future 以及有实现 __await__()
方 的对象,所以不是所有的对象或操作都能够用 await 进行暂停。因此,我们在使用三方库时,要看它的相关方法是否实现了可等待特性。
在 event loop 中,工作的执行是以 Task 为单位, event loop 一次仅会执行 1 个 Task, 如果某个 Task 正在等待执行结果,也就是执行到 await 的地方,那么 event loop 将会暂停(suspend) 并将进行任务调度,切换执行其他 Task、回调函数(callback)或者执行其他 I/O 相关的操作。
我们可以将 Task 视为是 coroutine 的再包装,因此可以看到 asyncio.create_task()
函数接受的参数必须是 coroutine ,例如下范建立一个 Task 实例后,交由 event loop 执行:
import asyncio
async def coro():
print('hello')
await asyncio.sleep(1)
print('world')
loop = asyncio.new_event_loop()
task = loop.create_task(coro())
loop.run_until_complete(task)
'''
hello
world
'''
future 是一个容器,或者占位符(placeholder),代表着一个将在未来成为现实的值。在等待网络 I/O 时,函数可以给我们一个容器,承诺在操作完成时,它将用值填充容器。我们保留未来的对象,当它实现时,我们可以调用它的方法来检索实际结果。
与 Task 不同的是, Future 对象并不是对 coroutine 进行再包装,而是作为代表非同步操作最终结果的对象(但并没有执行,是一个预期),因此它有一个 set_result()
方法,可以将结果写入,同时该 Future 也会被标为结束(done)的状态,所以 Future 通常会与 coroutines 或 Tasks 搭配使用,例如:
import asyncio
async def do_async_job(fut):
await asyncio.sleep(2)
fut.set_result('Hello future')
async def main():
loop = asyncio.get_running_loop()
future = loop.create_future()
loop.create_task(do_async_job(future))
# 等到 future 有结果
await future
print(future.result())
asyncio.run(main())
# Hello future
这里指的是 asyncio.Future 而不是 coroutines.futures.Future。
创建异步任务。使用 asyncio.create_task(coro)
方法,返回一个 Task 对象,Task 类继承 Future,在 python3.7 以下版本中使用 asyncio.ensure_future(coro_or_future)
。
见上例。
同时运行可等待序列中的等待对象。如果所有可等待项都成功完成,则结果是返回值的聚合列表。结果值的顺序对应于aws中可等待项的顺序。其内部调用的是 asyncio.ensure_future()
方法。
import asyncio
import time
import atexit
start = time.time()
atexit.register(lambda: print('用时(秒):', time.time()-start))
async def foo(char: str, count: int):
for i in range(count):
print(f"{char}-{i}")
await asyncio.sleep(.5)
async def main():
await asyncio.gather(foo("A", 2), foo("B", 3), foo("C", 2))
if __name__ == '__main__':
asyncio.run(main())
'''
A-0
B-0
C-0
A-1
B-1
C-1
B-2
用时(秒): 1.5046310424804688
'''
另外一个例子:
import asyncio
import time
# 模拟一个任务
async def task(id):
# 任务第一步
print(f'{time.strftime("%X")} Hello {id}!')
await asyncio.sleep(4) # 模拟第一步为 IO 操作,耗时 4 秒
# 任务第二步
print(f'{time.strftime("%X")} end {id}!')
# 异步执行 100 个任务
async def main():
await asyncio.gather(*(task(i) for i in range(100)))
# 执行,同步共用 4秒*100=400秒,异步总共 4 秒
asyncio.run(main())
我们知道,协程本身就只有一个线程,假如这协程阻塞了,那么整个程序也就阻塞了。为此我们在执行一些必然会产生阻塞的代码时,可以把代码放入到其它线程/进程中,这样可以继续执行协程的其它代码了。
asyncio.to_thread()
可以将同步阻塞放入多线程异步执行(python 3.9的新方法,相当于旧版本的 loop.run_in_executor()
):
import asyncio
import threading
import time
import atexit
start = time.time()
atexit.register(lambda: print('用时(秒):', time.time()-start))
def hard_work():
print('thread id:', threading.get_ident())
time.sleep(10)
async def do_async_job():
# hard_work()
await asyncio.to_thread(hard_work)
await asyncio.sleep(1)
print('job done!')
async def main():
task1 = asyncio.create_task(do_async_job())
task2 = asyncio.create_task(do_async_job())
task3 = asyncio.create_task(do_async_job())
await asyncio.gather(task1, task2, task3)
asyncio.run(main())
'''
thread id: 123145559015424
thread id: 123145575804928
thread id: 123145592594432
job done!
job done!
job done!
用时(秒): 11.012189865112305
'''
处理异步执行的结果和完成后执行一定的方法做收尾。要完成这些功能需要Task对象,即asyncio.create_task()的返回值。由于Task继承Future,实现了除Future.set_result() 和 Future.set_exception()外的全部API,而asyncio.Future模仿的是 concurrent.futures.Future类,所以Task很多方法和 在使用线/进程池时用到的方法类似(有细微差别)。
import asyncio
import time
import atexit
start = time.time()
atexit.register(lambda: print('用时(秒):', time.time()-start))
def callback(future):
# 唯一参数是一个Task对象
# print(type(future)) # <class '_asyncio.Task'>
print(future)
# <Task finished name='Task-2' coro=<foo() done,
# defined at E: ... xxx.py:11> result=123>
print(future.result()) # 123 接收返回值
print(future.get_name()) # foo
async def foo():
print("running")
return 123
async def main():
# name 形参3.8及以上版本可用
task = asyncio.create_task(foo(), name="foo")
task.add_done_callback(callback) # 添加回调函数
await task
if __name__ == '__main__':
asyncio.run(main())
'''
<Task finished name='foo' coro=<foo() done,
defined at /Users/hui/.../test2.py:20> result=123>
123
foo
用时(秒): 0.0008900165557861328
'''
使用 asyncio.wait_for(do_async_job(), timeout=1)
设定超时时限:
import asyncio
async def do_async_job():
await asyncio.sleep(2)
print('never print')
async def main():
try:
await asyncio.wait_for(do_async_job(), timeout=1)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
# timeout!
async with 则是为了 async 版的 context manager 而新增的语句。async 版的 context manager 则是实现了 __aenter__()
与 __aexit__()
两个方法即可。
不过 __aenter__()
与 __aexit__()
限制开发者一定要回传 awaitable 对象,如果不回传 awaitable 对象,将会导致程式无法顺利执行。例如:
import asyncio
class AsyncContextManager:
def __init__(self):
self.loop = asyncio.get_event_loop()
async def __aenter__(self):
print('entering context')
return self.loop.create_future()
async def __aexit__(self, exc_type, exc, tb):
print('exiting context')
return self.loop.create_future()
async def main():
async with AsyncContextManager() as acm:
print('in context')
asyncio.run(main())
'''
entering context
in context
exiting context
'''
除了上述方法可以实现异步上下文管理器外,还可以使用 contextlib.asynccontextmanager
装饰器+yield实现,yield前面的代码对应 __aenter__
,其后的代码对应 __aexit__
。
import contextlib
import asyncio
# 加上装饰器
@contextlib.asynccontextmanager
async def foo():
try:
# 进行初始化
yield "返回你要操作的对象"
finally:
# 处理释放资源等操作
pass
async def main():
async with foo() as f:
print(f)
if __name__ == "__main__":
asyncio.run(main())
Python 中的 iterator 指的是有实现 __iter__()
与 __next__()
两个方法的对象,因此 iterator 可以用 for 语句迭代。而 async for 则是为了 async 版的 iterator 而新增的语法, async 版的 iterator 则是需要实现 __aiter__()
与 __anext()__
两个个方法。
以下是实作 async iterator 的范例,可以看到 __anext__()
已经被转为 coroutine, 因此可以在该 iterator 内通过 event loop 执行非同步的工作,使得我们在执行 for 迭代时不会阻塞 event loop。
import asyncio
class AsyncCounter(object):
def __init__(self, stop=None):
self.count = 0
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self):
await asyncio.sleep(1)
self.count += 1
if self.stop == self.count:
raise StopAsyncIteration
return self.count
async def main():
async for i in AsyncCounter(11):
print(i)
asyncio.run(main())
'''
1
2
3
4
5
6
7
8
9
10
'''
常见的有:
更新时间:2024-06-09 09:50:18 标签:python 异步 编程