说明
《Python 教程》 持续更新中,提供建议、纠错、催更等加作者微信: gr99123(备注:pandas教程)和关注公众号「盖若」ID: gairuo。跟作者学习,请进入 Python学习课程。欢迎关注作者出版的书籍:《深入浅出Pandas》 和 《Python之光》。
Python 3.4 引进了协程的概念,它使用一种单线程单进程的的方式实现并发。线程是操作系统(或运行时环境)根据调度程序在线程之间切换,而协程决定何时切换协程的是程序员和编程语言。协同程序通过程序员在设定点处挂起和恢复来协同工作多个任务。可以访问 Python 多任务 了解背景知识。协程可以使我们按串行(同步)模型去组织原本分散在不同上下文中(异步+回调)的代码逻辑,使代码逻辑更清晰,同时避免状态同步问题。
协程,又称微线程,英文名 Coroutine,是运行在单线程中的“并发”,协程相比多线程的一大优势就是省去了多线程之间的切换开销,获得了更高的运行效率。协程由于由程序主动控制切换,没有线程切换的开销,所以执行效率极高。对于 IO 密集型任务非常适用,如果是 cpu 密集型,推荐多进程+协程的方式。计算机提供了进程和线程,协程是程序员自己实现的。
python 中协程概念是从 3.4 版本增加的,但 3.4 版本采用是生成器实现,为了将协程和生成器的使用场景进行区分,使语义更加明确,在 python 3.5 中增加了 async 和 await 关键字,用于定义原生协程。Python 中的异步 IO 模块 asyncio 就是基本的协程模块。
因此:
yield from
这是一种等待 asyncio 协程的老方法await
是等待 asyncio 协程的现代方式,推荐使用Python 的生成器 —— 一种特殊的函数,可以生成一系列结果,而不是单个值。从 Python 3.3 开始,添加了表达式 yield from
,它允许生成器将其部分操作委托给另一生成器。
从 Python 3.4 开始,asyncio 模块被添加到标准库中。它允许我们编写清晰易懂的异步代码。虽然从技术上讲,asyncio 的协程可以以不同的方式实现,但在 asyncio 中,它们是使用生成器实现的。@asyncio.coroutine
装饰器是从生成器生成协程的一种方式(可以把一个 generator 标记为 coroutine类型),而 yield from
是等待协程的一种方式——只是实现的细节。这就是为什么 yield from
开始被用于两个“不同的事情”的原因。
从 Python 3.5(PEP 492)开始,协程有了新的语法。现在,您可以使用 async def 定义协程,并使用 await 表达式等待它。这不仅缩短了编写时间,而且使我们更清楚地了解我们使用 asyncio 的协程。
如果您使用的是 Python3.5+,那么您可以忘记将 yield from 用于 asyncio 的协程,而使用 await。
关于 asyncio 等协程更多的使用方法,可以参考 Python 异步编程 中的相关内容。
接下来,我们了解一下协程的本质。
我们要知道,大多数程序,如网络服务、游戏、软件等都是一个巨大的死循环,代码逻辑都在这个死循环里运行,当我们给它一定的数据它就根据数据进行处理,比如游戏,我们给它一定的点击、滑动,它就执行一定的逻辑。在一些逻辑场景中,需要操作同样步骤的多个或者无限个任务,整体来说全部完成这些任务是非常耗时的。我们再来分析多个任务中单一任务,单一任务如果有 IO 密集型的操作,就涉及到等待 IO 的返回。例如:
通常是来自外部存储设备的数据,如硬盘、CD-ROM、以及需要 socket 通信传输的网络数据。如果在等待 IO 结果的时间是浪费的,下一步操作是等待的,后续独立任务由于需要当前任务会受到影响。
这时,我们让程序在前一步等待 IO 结果时挂起,让其他任务运行起来,待得到 IO 结果后再继续执行,这样,按这个机制,将多任务的执行时间大大缩减。
上边这个代码逻辑过程相对我们传统的顺序执行,是一个异步的过程。Python 将一个顺序执行的完成任务程序通过异步的思路分拆执行,被分拆的每个部分就是一个协程(Coroutine)。
我们都熟悉函数,它也被称为子例程(subroutine)、过程(procedure)、子过程(sub-process)等。函数是一系列以执行某项任务为单位的指令。当一个复杂函数的逻辑被分成几个独立的步骤,这些步骤本身就是函数时,这些函数被称为辅助函数或子例程。
Python 中的子例程由负责协调这些子例程使用的主函数调用,子例程只有一个入口点。
协程是子程序的泛化(generalizations ),它们用于协同多任务处理,其中一个进程自愿定期或在空闲时产生(yield )或者放弃控制,以便同时运行多个应用程序。协程和子例程之间的区别是:
对于线程,它是一个操作系统(或运行时环境),根据调度程序在线程之间切换。而在协程中,是程序员和编程语言决定何时切换协同程序。协程通过暂停和恢复程序员设定的多任务来协同工作。
在 Python 中,协程类似于生成器,但没有多少额外的方法,在使用 yield 语句的方式上也有细微的变化。生成器生成用于迭代的数据,而协程也可以消耗数据。
asyncio 是 Python 3.4 版本引入的标准库,直接内置了对异步 IO 的支持。asyncio的编程模型就是一个消息循环。我们从 asyncio 模块中直接获取一个 EventLoop 的引用,然后把需要执行的协程扔到 EventLoop 中执行,就实现了异步 IO。EventLoop 内部就是一个死循环,可以接收传入的协程任务并执行。不过 Python 3.7 开始,就不需要写 EventLoop 了。
我们以协程的典型应用场景网络爬虫为例,一般网络爬虫会涉及多个网站或者网址需要爬取,每个爬取任务分为两个步骤,为获取网页返回数据和对返回数据的解析处理。
我们假设有一批网址,我们想知道这些网址在爬虫访问时返回的状态码(如正常返回内容为 200),最终形成一个列表,我们常规编程方法和协程进行一个对比。首先是顺序执行的代码:
import requests
import asyncio
import time
now = lambda: time.time()
start = now()
urls = [
'https://www.baidu.com',
'https://www.163.com',
'https://www.mi.com',
'https://www.taobao.com',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.sohu.com',
'https://www.tencent.com',
'https://www.sina.com.cn',
'https://www.hao123.com',
'https://www.bilibili.com',
'https://www.58.com',
'https://www.zhihu.com',
'https://www.youku.com',
'https://www.iqiyi.com',
'https://www.126.com',
'https://www.cctv.com',
'https://www.douban.com',
'https://www.meituan.com',
]
se = requests.Session()
res = []
for url in urls:
r = se.get(url)
print('获取:', r.url)
time.sleep(5) # 假装要访问时间增加5秒钟
res.append(r.status_code)
print('处理:', f'<{r.url}>:', r.status_code)
print(res)
print(f'用时:{now()-start}')
'''
获取: https://www.baidu.com/
处理: <https://www.baidu.com/>: 200
获取: https://www.163.com/
处理: <https://www.163.com/>: 200
获取: https://www.mi.com/
处理: <https://www.mi.com/>: 200
...
获取: https://www.126.com/
处理: <https://www.126.com/>: 200
获取: https://www.cctv.com/
处理: <https://www.cctv.com/>: 200
获取: https://www.douban.com/
处理: <https://www.douban.com/>: 418
获取: https://www.meituan.com/error/403
处理: <https://www.meituan.com/error/403>: 200
[
200, 200, 200, 200, 200, 200, 200, 200, 200, 200,
200, 200, 403, 200, 200, 200, 200, 418, 200
]
用时:99.47416377067566
'''
我们得到了这些网站的状态码列表,有些由于我多次爬取返回了4xx 状态码,由于为每个访问模拟增加了 5 秒,总用时 99 秒。
接下来,我们采用异步协程方式编码:
import requests
import asyncio
import time
now = lambda: time.time()
start = now()
urls = [
'https://www.baidu.com',
'https://www.163.com',
'https://www.mi.com',
'https://www.taobao.com',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.sohu.com',
'https://www.tencent.com',
'https://www.sina.com.cn',
'https://www.hao123.com',
'https://www.bilibili.com',
'https://www.58.com',
'https://www.zhihu.com',
'https://www.youku.com',
'https://www.iqiyi.com',
'https://www.126.com',
'https://www.cctv.com',
'https://www.douban.com',
'https://www.meituan.com',
]
se = requests.Session()
# 获取网站响应内容
async def aget(url: str):
print('开始获取:', url)
res = se.get(url)
await asyncio.sleep(5) # 假装要访问时间增加5秒钟
print('获取完成:', url)
return res
# 处理数据
async def handle(res: requests.Response):
code = res.status_code
print('处理:', f'<{res.url}>:', code)
return code
# 执行操作:获取 url 信息再处理(异步)
async def get_url_info(url):
r = await aget(url)
url = await handle(r)
return url
async def main():
tasks = []
for url in urls:
tasks.append(get_url_info(url))
res = await asyncio.gather(*tasks)
print(res)
asyncio.run(main())
print(f'用时:{now()-start}')
'''
开始获取: https://www.baidu.com
开始获取: https://www.163.com
开始获取: https://www.mi.com
开始获取: https://www.taobao.com
...
开始获取: https://www.126.com
开始获取: https://www.cctv.com
开始获取: https://www.douban.com
开始获取: https://www.meituan.com
获取完成: https://www.baidu.com
处理: <https://www.baidu.com/>: 200
获取完成: https://www.163.com
处理: <https://www.163.com/>: 200
获取完成: https://www.mi.com
处理: <https://www.mi.com/>: 200
获取完成: https://www.taobao.com
...
处理: <https://www.douban.com/>: 418
获取完成: https://www.meituan.com
处理: <https://www.meituan.com/error/403>: 200
[
200, 200, 200, 200, 200, 200, 200, 200, 200, 200,
200, 200, 403, 200, 200, 200, 200, 418, 200
]
用时:9.403196096420288
'''
得到了同样的结果,但用时只用 9 秒多,我们还发现执行顺序并没有按获取数据、处理数据的顺序进行执行,而是在等待获取结果的间隙就开始了下一个网站访问,在得到访问响应结果后马上进行数据处理,这样虽然每个访问我们都模拟增加了 5 秒,但总的同时却更少。
在上述协程代码中,我们将访问数据获取数据和处理数据写成两个协程函数,最后统由一个执行数据函数 get_url_info() 来安排它们的调用关系。
最后,我们用到的网络访问三方库 requests 是不个同步库,自身不支持异步访问,我们可以使用 aiohttp 或者 httpx 等支持异步的 http 访问库来操作,进一步提升操作效率:
import asyncio
import time
import httpx
now = lambda: time.time()
start = now()
urls = [
'https://www.baidu.com',
'https://www.163.com',
'https://www.mi.com',
'https://www.taobao.com',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.sohu.com',
'https://www.tencent.com',
'https://www.sina.com.cn',
'https://www.hao123.com',
'https://www.bilibili.com',
'https://www.58.com',
'https://www.zhihu.com',
'https://www.youku.com',
'https://www.iqiyi.com',
'https://www.126.com',
'https://www.cctv.com',
'https://www.douban.com',
'https://www.meituan.com',
]
clt = httpx.AsyncClient()
# 获取网站响应内容
async def aget(url: str):
print('开始获取:', url)
res = await clt.get(url)
await asyncio.sleep(5) # 假装要访问时间增加5秒钟
print('获取完成:', url)
return res
# 处理数据
async def handle(res: httpx.Response):
code = res.status_code
print('处理:', f'<{res.url}>:', code)
return code
# 执行操作:获取 url 信息再处理(httpx 异步)
async def get_web_info(url):
r = await aget(url)
url = await handle(r)
return url
async def main():
tasks = []
for url in urls:
tasks.append(get_web_info(url))
res = await asyncio.gather(*tasks)
print(res)
asyncio.run(main())
print(f'用时:{now()-start}')
'''
开始获取: https://www.baidu.com
开始获取: https://www.163.com
开始获取: https://www.mi.com
...
开始获取: https://www.cctv.com
开始获取: https://www.douban.com
开始获取: https://www.meituan.com
获取完成: https://www.baidu.com
处理: <https://www.baidu.com>: 200
获取完成: https://www.126.com
处理: <https://www.126.com>: 200
获取完成: https://www.jd.com
处理: <https://www.jd.com>: 200
...
获取完成: https://www.iqiyi.com
处理: <https://www.iqiyi.com>: 200
[
200, 200, 200, 200, 200, 200, 200, 200, 200,
200, 302, 302, 302, 200, 200, 200, 200, 418, 302
]
用时:5.840916872024536
'''
换为支持异步的 httpx 库,整体速度提升到了6秒以内。接下来,我们简化代码,看看实际的运行效果。
import asyncio
import time
import httpx
now = lambda: time.time()
start = now()
urls = [
'https://www.baidu.com',
'https://www.163.com',
'https://www.mi.com',
'https://www.taobao.com',
'https://www.tmall.com',
'https://www.jd.com',
'https://www.sohu.com',
'https://www.tencent.com',
'https://www.sina.com.cn',
'https://www.hao123.com',
'https://www.bilibili.com',
'https://www.58.com',
'https://www.zhihu.com',
'https://www.youku.com',
'https://www.iqiyi.com',
'https://www.126.com',
'https://www.cctv.com',
'https://www.douban.com',
'https://www.meituan.com',
]
async def main():
res = []
async with httpx.AsyncClient() as client:
all_r = await asyncio.gather(*[client.get(url) for url in urls])
res = [r.status_code for r in all_r]
print(res)
asyncio.run(main())
print(f'用时:{now()-start}')
'''
[
200, 200, 200, 200, 200, 200, 200, 200, 200, 200, 302,
302, 302, 200, 200, 200, 200, 418, 302
]
用时:0.6375458240509033
'''
注意,这次我们没有模拟每个访问增加 5 秒,仅用 0.6 秒多。由于我们的网站数量较少,在上万基础百万数量级的任务下,就能感受到协程的强悍执行效率!
可见,协程能够有效利用网络访问等 IO 等待时间,提高代码执行效率。IO 操作采用异步形式才有意义,比如网络请求、操作数据库、文件读取写入等,比如上例中的数据处理方法 handle() 也可以写成同步函数。
关于 asyncio 等协程更多的使用方法,可以参考 Python 异步编程 中的相关内容。
协程的优势:
协程和线程区别:
总之,在同一时刻只有一个任务在进行,在是在 IO 等待期间让其他任务进行,它不是真正的并行。
更新时间:2022-04-24 09:36:14 标签:python 协程