Python异步协程(asyncio详解)

续上篇讲解 yield from 博客,上篇链接:https://www.cnblogs.com/Red-Sun/p/16889182.html
PS:本博客是个人笔记分享,不需要扫码加群或必须关注什么的(如果外站需要加群或关注的可以直接去我主页查看)
欢迎大家光临ヾ (≧▽≦*)o 我的博客首页https://www.cnblogs.com/Red-Sun/
首先要了解什么是协程,其次知道异步跟同步的区别。(PS:个人喜欢多做比喻,不恰当地方望指正)
本文仅仅是个人学习笔记,有错的地方望各位指点。
如果把进程比作从 A 处到 B 处去这件事,那么线程就是可供选择的多条道路,协程就是道路上特殊路段(类似限速,一整条道路都是特殊路段的话,就是全部由协程实现)
例图如下:

1. 什么是协程(Coroutines)

在了解异步之前,先大致了解一下什么是协程。
网上的讲法有各种:

  • 协程是一种比线程更加轻量级的存在
  • 协程是一种用户级的轻量级线程
  • 协程,又称微线程

大体看过之后就感觉,我好像懂了,有好像没懂,个人感觉有点晕乎乎的,没太明白。(PS: 可能是我个人智商没够不能快速领悟的原因)
个人理解(PS:不涉及其本质来源、底层实现、仅仅就着这个异步爬虫来说):协程就像一条带应急车道的高速公路(具体作用就是让任务有了暂停切换功能)
线程:把需要执行的任务比作汽车,线程就像一条单行且只有一条道的高速公路,只有等前一辆车到达终点后面的车才能出发,如果其中一辆出了事情停在了路上,那么这俩车后面的车就只能原地等待直到它恢复并到达终点才能继续上路。
协程:把需要执行的任务比作汽车,协程就像一条带应急车道的高速公路,如果汽车在中途出了问题就可以直接到一边的应急车道停下处理问题,下一辆车可以直接上路,简单来说就是可以通过程序控制哪辆车行驶,哪辆车在应急车道休息。

2. 同步跟异步

同步跟异步是两个相对的概念:
同步:意味着有序
异步:意味着无序
小故事模拟事件:
小明在家需要完成如下事情:

  1. 电饭锅煮饭大约 30 分钟
  2. 洗衣机洗衣服大约 40 分钟
  3. 写作业大约 50 分钟

在同步情况下:小明需要电饭锅处等待 30 分钟、洗衣机处等待 40 分钟、写作业 50 分钟,总计花费时间 120 分钟。
在异步情况下:小明需要电饭锅处理并启动花费 10 分钟、洗衣机处理并启动花费 10 分钟,写作业花费 50 分钟,总计花费时间 70 分钟。
即同步必须一件事情结束之后再进行下一件事,异步是可以在一件事情没结束就去处理另外一件事情了。
注意:此处异步比同步耗时更短是有前提条件的!要是 I/O 阻塞才可以(说人话:类似电饭锅煮饭,电饭锅可以自行完成这种的)
如果把条件中的电饭锅换成柴火,洗衣机换成搓衣板,那么事情就只能一件一件完成了,两者耗时相近。

3.asyncio 异步协程

asyncio 即 Asynchronous I/O 是 python 一个用来处理并发 (concurrent) 事件的包,是很多 python 异步架构的基础,多用于处理高并发网络请求方面的问题。
此处使用的是 Python 3.5 之后出现的 async/await 来实现协程,需要 yield 实现协程的可以去我上篇博客瞅瞅:点击此处快速跳转

基础补充(比较基础的内容懂的可以直接跳)

  1. 普通函数
def function():
    return 1

2. 由 async 做前缀的普通函数变成了异步函数

async def asynchronous():
    return 1

而异步函数不同于普通函数不可能被直接调用

async def asynchronous():
    return 1

print(asynchronous())


尝试用 send 驱动这个协程

async def asynchronous():
    return 1

asynchronous().send(None)


值有了不过存储在了这个 StopIteration 报错中,于是有了下方的执行器

# -*- coding: utf-8 -*-
# @Time    : 2022/11/22 16:03
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : async_function.py
# @Software: PyCharm

async def asynchronous():
return 1

def run(async_function): # 用 try 解决报错问题,运行协程函数
try:
async_function().send(None)
except StopIteration as r:
return r.value

print(run(asynchronous))


成功执行 (`・ω・´) ゞ(`・ω・´)ゞ


在协程函数中 await 的使用(PS:await 只能使用在有 async 修饰的函数中不然会报错)
await 的作用是挂起自身的协程,直到 await 修饰的协程完成并返回结果(可参照第一点什么是协程中的描述)

# -*- coding: utf-8 -*-
# @Time    : 2022/11/22 16:03
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : await_function.py
# @Software: PyCharm

async def asynchronous():
return 1

async def await_function(): # await 挂起自身函数,等待另外协程函数运行完毕
result = await asynchronous()
return result

def run(async_function): # 用 try 解决报错问题,运行协程函数
try:
async_function().send(None)
except StopIteration as r:
return r.value

print(run(await_function))


执行流程 run 函数 ->await_function 函数 -> 执行到 await 时 ->await_function 挂起(暂停等待)->asynchronous 函数执行并返回 1 ->await_function 继续运行返回 result ->print 打印 result 值

使用进阶

对 asyncio 的使用首先要了解:

  1. 事件循环

创建一个循环类似不停运行的洗衣机,把事件(类似衣服)放到循环中,个人描述就像是将需要清洗的衣服都放到洗衣机中一共处理。

  1. Future

Future 对象表示未完成的计算,还未完成的结果(PS:等待要洗的衣服们(假想成脏衣服堆))

  1. Task

是 Future 的子类, 作用是在运行某个任务的同时可以并发的运行多个任务。(PS:那个脏衣服堆中的单独一件,可以被扔到洗衣机洗的脏衣服)

3.8 版本之前的代码

先讲需要自己创建 loop 的后面再讲 3.8 更新后的更容易记忆一点(PS:3.8 的更为简约想直接看 3.8 版的也可)

1. 下面是一个基础的运行实例
# -*- coding: utf-8 -*-
# @Time    : 2022/11/24 17:32
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 1.py
# @Software: PyCharm

import asyncio
import time

async def async_function(): # async 修饰的异步函数,在该函数中可以添加 await 进行暂停并切换到其他异步函数中
now_time = time.time()
await asyncio.sleep(1) # 当执行 await future 这行代码时(future 对象就是被 await 修饰的函数),首先 future 检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的 Task(任务)等待 future 完成。
print('花费时间:{} 秒'.format(time.time()-now_time))

event = async_function() # 创建协程事件对象

loop = asyncio.get_event_loop() # 通过 get_event_loop 方法获取事件循环对象
loop.run_until_complete(event) # 通过 run_until_complete 方法直接运行 event,该方法会一直等待直到 event 运行完毕
loop.close() # 结束循环

2. 关于 task 对象的操作
(1) 创建任务对象并打印其状态
# -*- coding: utf-8 -*-
# @Time    : 2022/11/24 17:32
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 2.py
# @Software: PyCharm

import asyncio
import time

async def async_function(): # async 修饰的异步函数,在该函数中可以添加 await 进行暂停并切换到其他异步函数中
now_time = time.time()
await asyncio.sleep(1) # 当执行 await future 这行代码时(future 对象就是被 await 修饰的函数),首先 future 检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的 Task(任务)等待 future 完成。
print('花费时间:{} 秒'.format(time.time()-now_time))

event = async_function() # 创建协程事件对象

loop = asyncio.get_event_loop() # 通过 get_event_loop 方法获取事件循环对象
task = loop.create_task(event) # 创建任务对象
print(task) # 任务运行中 task
loop.run_until_complete(task) # 等待 task 运行完毕
print(task) # 任务运行结束 task 状态
loop.close() # 结束循环


运行中:状态显示为 running
运行结束后:状态显示 done,result 为协程函数返回值,因为此函数无返回值所以为 None

(2) 获取 task 返回值
  • 方法一:通过 task.result() 的方法获取返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 10:40
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 3.py
# @Software: PyCharm

import asyncio
import time

async def async_function(): # async 修饰的异步函数,在该函数中可以添加 await 进行暂停并切换到其他异步函数中
now_time = time.time()
await asyncio.sleep(1) # 当执行 await future 这行代码时(future 对象就是被 await 修饰的函数),首先 future 检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的 Task(任务)等待 future 完成。
return '花费时间:{} 秒'.format(time.time() - now_time) # 将打印语句换成返回值

event = async_function() # 创建协程事件对象

loop = asyncio.get_event_loop() # 通过 get_event_loop 方法获取事件循环对象
task = loop.create_task(event) # 创建任务对象
print(task) # 任务运行中 task
try:
print(task.result()) # 任务未完成打印 result 会抛出 InvalidStateError 错误
except asyncio.InvalidStateError as r:
print(r) # InvalidStateError 报错信息
loop.run_until_complete(task) # 等待 task 运行完毕
print(task) # 任务运行结束 task 状态
print(task.result()) # 打印出 task 的返回值
loop.close() # 结束循环

  • 方法二:通过 add_done_callback() 添加完成回调
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 11:15
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 4.py
# @Software: PyCharm

import asyncio
import time

def task_callback(future): # 回调函数获取任务完成后的返回值
print(future.result())

async def async_function(): # async 修饰的异步函数,在该函数中可以添加 await 进行暂停并切换到其他异步函数中
now_time = time.time()
await asyncio.sleep(1) # 当执行 await future 这行代码时(future 对象就是被 await 修饰的函数),首先 future 检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的 Task(任务)等待 future 完成。
return '花费时间:{} 秒'.format(time.time() - now_time) # 将打印语句换成返回值

event = async_function() # 创建协程事件对象
loop = asyncio.get_event_loop() # 通过 get_event_loop 方法获取事件循环对象
task = loop.create_task(event) # 创建任务对象

task.add_done_callback(task_callback) # 为而任务添加回调函数

loop.run_until_complete(task) # 等待 task 运行完毕
loop.close() # 结束循环


通过 Future 的 add_done_callback() 方法来添加回调函数,当任务完成后,程序会自动触发该回调函数,并将对应的 Future 对象作为参数传给该回调函数。
PS:Function 'add_done_callback' doesn't return anything(函数“add_done_callback”不返回任何内容)

3. 多任务 tasks 的实现
(1) 通过 asyncio.wait() 来控制多任务
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 14:12
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 5.py
# @Software: PyCharm
import asyncio
import time

async def async_function(num): # async 修饰的异步函数,在该函数中可以添加 await 进行暂停并切换到其他异步函数中
await asyncio.sleep(num) # 当执行 await future 这行代码时(future 对象就是被 await 修饰的函数),首先 future 检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的 Task(任务)等待 future 完成。
print('协程花费时间:{} 秒'.format(time.time() - now_time))

now_time = time.time() # 程序运行时的时间戳
events = [async_function(num=num) for num in range(1, 4)] # 创建协程事件列表
loop = asyncio.get_event_loop() # 通过 get_event_loop 方法获取事件循环对象
tasks = asyncio.wait(events) # 通过 asyncio.wait(events) 创建多任务对象

loop.run_until_complete(tasks) # 等待 task 运行完毕
loop.close() # 结束循环
print('总运行花费时常:{} 秒'.format(time.time() - now_time))

(2) 多任务获取返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 15:38
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 6.py
# @Software: PyCharm
import asyncio
import time

async def async_function(num): # async 修饰的异步函数,在该函数中可以添加 await 进行暂停并切换到其他异步函数中
await asyncio.sleep(num) # 当执行 await future 这行代码时(future 对象就是被 await 修饰的函数),首先 future 检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的 Task(任务)等待 future 完成。
return '协程花费时间:{} 秒'.format(time.time() - now_time)

now_time = time.time() # 程序运行时的时间戳
loop = asyncio.get_event_loop() # 通过 get_event_loop 方法获取事件循环对象
tasks = [loop.create_task(async_function(num=num)) for num in range(1, 4)] # 通过事件循环的 create_task 方法创建任务列表
events = asyncio.wait(tasks) # 通过 asyncio.wait(tasks) 将任务收集起来

loop.run_until_complete(events) # 等待 events 运行完毕
for task in tasks: # 遍历循环列表,将对应任务返回值打印出来
print(task.result())
loop.close() # 结束循环

print('总运行花费时常:{} 秒'.format(time.time() - now_time))

(3) 通过 add_done_callback() 添加回调
# -*- coding: utf-8 -*-
# @Time    : 2022/11/25 15:58
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 7.py
# @Software: PyCharm
import asyncio
import time

def task_callback(future): # 回调函数获取任务完成后的返回值
print(future.result())

async def async_function(num): # async 修饰的异步函数,在该函数中可以添加 await 进行暂停并切换到其他异步函数中
await asyncio.sleep(num) # 当执行 await future 这行代码时(future 对象就是被 await 修饰的函数),首先 future 检查它自身是否已经完成,如果没有完成,挂起自身,告知当前的 Task(任务)等待 future 完成。
return '协程花费时间:{} 秒'.format(time.time() - now_time)

now_time = time.time() # 程序运行时的时间戳
loop = asyncio.get_event_loop() # 通过 get_event_loop 方法获取事件循环对象
tasks = [] # 任务收集列表(PS:就像脏衣服堆)
for num in range(1, 4):
task = loop.create_task(async_function(num=num)) # 创建单个任务(单件脏衣服)
task.add_done_callback(task_callback) # 为每个任务添加对应的回调函数
tasks.append(task)
events = asyncio.wait(tasks) # 通过 asyncio.wait(tasks) 将任务收集起来 PS:想象成装脏衣服的篮子

loop.run_until_complete(events) # 等待 events 运行完毕

loop.close() # 结束循环

print('红后总运行花费时长:{} 秒'.format(time.time() - now_time))

4. 动态不停添加任务 task 实现

除了像上面第第三点那种设定循环一口气执行的(就像把脏衣服一口气塞进洗衣机),还可以一个一个执行(把脏衣服一件一件放进去)。
方法:另外创建一条线程,在其中创建一个一直循环的事件循环。(PS:换个大地方放下一台能够一直运行的洗衣机,就可以把脏衣服一件一件丢进去了)

(1)同步状态下
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 14:22
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 8.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread

def thread_new_loop(loop): # 创建线程版洗衣机
asyncio.set_event_loop(loop) # 在线程中调用 loop 需要使用 set_event_loop 方法指定 loop
loop.run_forever() # run_forever() 会永远阻塞当前线程,直到有人停止了该 loop 为止。

def function(num): # 同步执行的任务方法
print('任务 {} 花费时间:{}秒'.format(num, time.time() - now_time))
return '任务 {} 完成时间:{}秒'.format(num, time.time() - now_time)

now_time = time.time() # 程序运行时的时间戳
new_loop = asyncio.new_event_loop() # 创建一个新的 loop,get_event_loop()只会在主线程创建新的 event loop,其他线程中调用 get_event_loop() 则会报错
t = Thread(target=thread_new_loop, args=(new_loop,)) # 创建线程
t.start() # 启动线程
even = new_loop.call_soon_threadsafe(function, 1) # 调用 call_soon_threadsafe 实现回调(详细描述往下找)
even.cancel() # 当 call_soon_threadsafe 对象执行 cancel() 方法就会取消该任务事件(当速度够快有概率取消前已经执行)
new_loop.call_soon_threadsafe(function, 2)
new_loop.call_soon_threadsafe(function, 3)


loop.call_soon():传入目标函数和参数,可以将目标函数放到事件循环 loop 中,返回值是一个 asyncio.Handle 对象,此对象内只有一个方法为 cancel() 方法,用来取消回调函数。
loop.call_soon_threadsafe() :比上一个多了个 threadsafe 保护线程安全。

(2)异步状态下

与同步相比,函数为异步函数并且通过 asyncio.run_coroutine_threadsafe() 方法回调

# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 16:16
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 9.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread

def thread_new_loop(loop): # 创建线程版洗衣机
asyncio.set_event_loop(loop) # 在线程中调用 loop 需要使用 set_event_loop 方法指定 loop
loop.run_forever() # run_forever() 会永远阻塞当前线程,直到有人停止了该 loop 为止。

async def async_function(num): # 异步执行的任务方法
await asyncio.sleep(num)
print('异步任务 {} 花费时间:{}秒'.format(num, time.time() - now_time))
return '异步任务 {} 完成时间:{}秒'.format(num, time.time() - now_time)

now_time = time.time() # 程序运行时的时间戳
new_loop = asyncio.new_event_loop() # 创建一个新的 loop,get_event_loop()只会在主线程创建新的 event loop,其他线程中调用 get_event_loop() 则会报错
t = Thread(target=thread_new_loop, args=(new_loop,)) # 创建线程
t.start() # 启动线程
even = asyncio.run_coroutine_threadsafe(async_function(1), new_loop) # 调用 asyncio.run_coroutine_threadsafe 实现回调
even.cancel() # 当 run_coroutine_threadsafe 对象执行 cancel() 方法就会取消该任务事件(当速度够快有概率取消前已经执行)
asyncio.run_coroutine_threadsafe(async_function(2), new_loop)
asyncio.run_coroutine_threadsafe(async_function(3), new_loop)
print('红后主进程运行花费时长:{} 秒'.format(time.time() - now_time))


因为使用了 loop.run_forever()所以会一直启用事件循环到 stop() 的调用终止。
若要主线程退出时子线程也退出,可以设置子线程为守护线程 t.setDaemon(True) 需要在线程执行前设置。

3.8 以后的(PS:只要简单使用直接看这个就行)

运行协程的三种基本方式
async.run() 运行协程
async.create_task()创建 task
async.gather()获取返回值

(1)用 run() 运行协程
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 17:34
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 10.py
# @Software: PyCharm
import asyncio
import time
from threading import Thread

async def async_function(num): # 异步执行的任务方法
await asyncio.sleep(num)
print('异步任务 {} 完成时间:{}秒'.format(num, time.time() - now_time))

now_time = time.time() # 程序运行时的时间戳
asyncio.run(async_function(1)) # 用 asyncio.run 直接运行协程参数为协程函数及其参数

(2)用 create_task() 创建 task
# -*- coding: utf-8 -*-
# @Time    : 2022/11/28 17:37
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 11.py
# @Software: PyCharm
import asyncio
import time

async def async_function(num): # 异步执行的任务方法
await asyncio.sleep(num)
print('异步任务 {} 完成时间:{}秒'.format(num, time.time() - now_time))

async def main(): # 异步主函数用于调度其他异步函数
tasks = [] # tasks 列表用于存放 task
for num in range(1, 4):
tasks.append(asyncio.create_task(async_function(num)))
for task in tasks:
await task

now_time = time.time() # 程序运行时的时间戳
asyncio.run(main()) # 用 asyncio.run 直接运行协程参数为协程函数及其参数
print('【红后】最终执行时间:{}'.format(time.time() - now_time))


PS:必须先通过 asyncio.create_task 将 task 创建到 event loop 中,再通过 await 等待,如果直接用 await 等待则会导致异步变同步

(3) 用 gather() 收集返回值
# -*- coding: utf-8 -*-
# @Time    : 2022/11/29 9:25
# @Author  : 红后
# @Email   : not_enabled@163.com
# @blog    : https://www.cnblogs.com/Red-Sun
# @File    : 实例 12.py
# @Software: PyCharm
import asyncio
import time

async def async_function(num): # 异步执行的任务方法
await asyncio.sleep(num)
return '异步任务 {} 完成时间:{}秒'.format(num, time.time() - now_time)

async def main(): # 异步主函数用于调度其他异步函数
tasks = [] # tasks 列表用于存放 task
for num in range(1, 4):
tasks.append(asyncio.create_task(async_function(num)))
response = await asyncio.gather(tasks[0], tasks[1], tasks[2]) # 将 task 作为参数传入 gather,等异步任务都结束后返回结果列表
print(response)

now_time = time.time() # 程序运行时的时间戳
asyncio.run(main()) # 用 asyncio.run 直接运行协程参数为协程函数及其参数
print('【红后】最终执行时间:{}'.format(time.time() - now_time))


__EOF__

  • 本文作者: 红 后
  • 本文链接: https://www.cnblogs.com/Red-Sun/p/16934843.html
  • 关于博主: 评论和私信会在第一时间回复。或者直接私信我。
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。