详解 Python 协程

基本概念

通常我们认为线程是轻量级的进程,因此我们也把协程理解为轻量级的线程。

在 Python 中并发编程一般都是使用多线程或者多进程来实现的,对于 CPU 密集型任务由于 GIL 的存在我们通常使用多进程,而对于 IO 密集型任务我们可以通过线程调度,让线程在执行 IO 任务时让出 GIL,从而实现表面上的并发。

其实对于 IO 密集型任务我们还有另外一种选择,就是 协程(Coroutine)。协程的作用是在执行函数 A 时可以随时中断去执行函数 B,然后中断函数 B 继续执行函数 A,即可以在任务间自由切换,由用户决定,在哪些地方交出控制权。这一过程看似像多线程,但其实只有一个线程执行,协程就相当于是运行在单线程中的“并发”。

协程有什么优势?

  1. 开销少,执行效率极高:因为子程序(函数)切换不是线程切换,而是由程序自身控制,没有线程上下文切换的开销。所以与多线程相比,线程的数量越多,协程性能的优势就越明显。
  2. 不需要多线程的锁机制:因为只有一个线程,不存在同时写变量冲突,在协程中控制共享资源不需要加锁,也没有状态同步的开销,因此执行效率比多线程高很多。
  3. 方便切换控制流,简化代码逻辑。

协程如何处理CPU密集型任务?

协程可以很好地处理 IO 密集型任务的效率问题,但其本质是个单线程,处理 CPU 密集型任务不是它的长处。如果要充分发挥多核 CPU 的性能,最简单的方法是 多进程+协程,既充分利用多核,又充分发挥协程的高效率,可以获得极高的性能。

进程、线程、协程的对比

进程和线程是抢占式的调度(CPU 决定),协程是协同式的调度(代码决定)。

  • 进程:每个人都领一套工具(环境,上下文)去干活,要干很多工作,人多(核多)就可以做的更快。
  • 线程:一个人有一套工具后干活,要干很多工作,先干哪个要听老板的(系统),遇见要等待的事情,老板会叫你先干别的。但老板不懂你干的活,比如你挖一个坑埋一个萝卜再盖土,结果他让你挖坑、埋土、放萝卜,这时候就要用锁告诉老板这个必须先放了萝卜才能埋土。
  • 协程:一个人有一套工具后干活,要干很多工作,先干哪个由自己决定(代码),遇见要等待的事情,自己会先干别的。对于老板来说,协程内的工作他不会干预,比如他会让你吃饭,但不会规定先吃哪个菜。而正是因为省去了老板的工作量,才大大提高了公司的效率。
  • 串行:一个人有一套工具后干活,要干很多工作,先干哪个完全按写好的清单来,遇见要等待的事情,就等他执行完了再做下一个。

发展历程

  • Python 2.2 中,第一次引入了生成器,生成器实现了一种惰性、多次取值的方法,此时还是通过next构造生成迭代链或进行多次取值。
  • Python 2.5 中,引入了yield关键字,使得生成器有了记忆功能,下一次从生成器取值时,可以恢复到上次yield执行的位置。此外,生成器还加入了send方法与yield搭配使用,可以用yield让生成器暂停到一个状态,还可以通过send方法传入一个值来改变其停止位置的状态。
  • Python 3.3 中,新增了yield from关键字,实现了在生成器(委派生成器)内部调用另外生成器(子生成器)的功能,可以轻易的重构生成器,比如将多个生成器连接在一起执行。
  • Python 3.4 中,新增了 asyncio 库,提供了一个默认的 Event Loop,在语法上支持使用@asyncio.coroutineyield from实现协程,有了足够的基础工具进行异步并发编程。
  • Python 3.5 中,引入了async/await关键字,用以取代@asyncio.coroutineyield from,从而简化了协程的使用并且便于理解。

迭代器和生成器

容器与迭代器

在 Python 中一切皆对象,对象的抽象就是类,而 对象的集合就是容器,比如字符串、列表、元组、字典、集合都是容器。不同容器的区别,在于内部数据结构的实现方法。

所有的容器都是 可迭代的(iterable),即可迭代对象。这里的迭代,和枚举不完全一样。迭代可以想象成是你去买苹果,卖家并不告诉你他有多少库存。这样,每次你都需要告诉卖家,你要一个苹果,然后卖家采取行为:要么给你拿一个苹果;要么告诉你,苹果已经卖完了。你并不需要知道,卖家在仓库是怎么摆放苹果的。

严谨地说,迭代器(iterator)提供了一个 _next_ 方法,调用这个方法后,你要么得到这个容器的下一个对象,要么得到一个 StopIteration 异常。对于可迭代对象,通过调用 iter() 函数可以得到一个迭代器,迭代器可以通过 next() 函数来得到下一个元素,从而实现遍历,for...in... 语句就是将这个过程隐式化。

示例:遍历迭代器对象

1
2
3
4
5
6
7
8
9
iterable_list = iter([1, 2])

while True:
try:
print('ok')
print(next(iterable_list))
except StopIteration:
print('empty')
break

生成器

生成器(generator)是一种特殊的迭代器,它并不会像迭代器一样占用大量内存。合理使用生成器,可以降低内存占用、优化程序结构、提高程序速度。此外,在 Python 2 的版本中,生成器也是实现l协程的一种重要方式。

示例:生成器与迭代器的性能差异比较

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import os
import psutil


def show_memory(hint):
memory = f'{psutil.Process(os.getpid()).memory_full_info().uss / 1024 / 1024:.4f} MB'
print(f'{hint} memory used: {memory}')


def test_iterator():
show_memory('init iterator')
res = [i for i in range(50000000)] # 返回迭代器,其中的每个元素在生成后都会保存到内存中
show_memory('after initiated')
print(f'sum result: {sum(res)}')
show_memory('after sum called')
print()


def test_generator():
show_memory('init generator')
res = (i for i in range(50000000)) # 返回生成器,只有在调用next()函数的时候,才会生成下一个变量
show_memory('after initiated')
print(f'sum result: {sum(res)}')
show_memory('after sum called')
print()


test_iterator()
test_generator()

yield 关键字

yield 应当取其“让步”而非“产出”或“返回”之意,这是理解协程的关键。

yield 有让步之意,因为它交出了程序的控制权,但这个协程并没有结束,下一次执行时,将恢复到之前让出程序控制权的地方,也就是 yield 语句执行的地方继续执行。方法中包含yield表达式后,Python 会将其视作一个生成器对象,不再是普通的方法。

常用方法说明:

  • __next__()方法或next(): 作用是启动或者恢复 generator 的执行,相当于send(None)
  • send(value)方法:作用是发送值给 yield 表达式,启动 generator 会调用send (None)
  • 使用生成器之前需要先调用__next__或者send(None)方法,否则将报错。

代码示例

示例一:yield 的简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def test():
print('generator start')
n = 1
while True:
yield_expression_value = yield n
print(f'yield_expression_value: {yield_expression_value}')
n += 1 # 创建generator对象


test_generator = test()
print(type(test_generator)) # <class 'generator'>
# 启动generator
next_result = next(test_generator) # generator start
print(f'next_result: {next_result}') # 1
# 发送值给yield表达式
send_result = test_generator.send(100) # 100
print(f'send_result: {send_result}') # 2
# 再次调用generator
send_result = next(test_generator) # None
print(f'send_result: {send_result}') # 3

示例二:yield 和 send 与外界的交互流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def jump_range(up_to):
step = 0
while step < up_to:
jump = yield step
print('jump', jump)
if jump is None:
jump = 1
step += jump
print('step', step)


iterator = jump_range(10)
print(next(iterator)) # 0
print(iterator.send(4)) # jump 4; step 4; 4
print(next(iterator)) # jump None; step 5; 5
print(iterator.send(-1)) # jump -1; step 4; 4

示例三:yield 生产者-消费者模型

传统的生产者-消费者模型是一个线程写消息,一个线程读消息,通过锁机制控制队列和等待,但一不小心就可能死锁。如果改用协程,生产者生产消息后,直接通过 yield 跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
"""示例说明:
1. consumer函数是一个generator
2. 把一个consumer传入produce后,首先调用c.send(None)启动生成器
3. 一旦produce产生了消息,通过c.send(n)切换到consumer执行
4. consumer通过yield拿到消息,进行处理,又通过yield把结果传回
5. produce拿到consumer处理的结果,继续生产下一条消息
6. produce决定不生产了,通过c.close()关闭consumer,整个过程结束
7. 整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务
"""


def consumer():
r = ''
while True:
n = yield r
if not n:
return
print(f'[CONSUMER] Consuming {n}...')
r = '200 OK'


def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print(f'[PRODUCER] Producing {n}...')
r = c.send(n)
print(f'[PRODUCER] Consumer return: {r}')
c.close()


C = consumer()
produce(C)

示例四:yield from 表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 子生成器

def gen_n(n):
i = 0
while i < n:
yield i
i += 1


# 委派生成器
def main(n):
print('start')
yield from gen_n(n)
print('do something')
yield from gen_n(n)
print('end')


for e in main(3):
print(e)

asyncio 库

asyncio 是用来编写 并发 代码的库,使用 async/await 语法,是实现高性能 Python 异步框架的基础。

协程与任务

可等待对象

如果一个对象可以在 await 语句 中使用,那么它就是 可等待对象(awaitable)

对 await 关键字的理解:简单的说,await就是挂起当前任务,去执行其他任务,此时是堵塞的,必须要等其他任务执行完毕才能返回到当前任务继续往下执行,这样的说的前提是,在一个时间循环中有多个 task 或 future,当 await 右面等待的对象是协程对象时,就没有了并发的作用,就是堵塞等待这个协程对象完成。

可等待对象有三种主要类型:协程(Coroutine)任务(Task)Future 对象

  1. 协程:Python 协程属于 可等待 对象,因此可以在其他协程中被等待。协程函数的调用不会立即执行,而是会返回一个协程对象。协程对象需要注册到事件循环中,由事件循环调用。

    协程函数:定义形式为 async def 的函数
    协程对象:调用协程函数所返回的对象

  2. 任务:任务 是对协程的进一步封装,其中包含有各种状态,用于 并发 运行多个协程。当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务该协程会被自动调度执行

  3. Future 对象:Future 是一种特殊的 低层级 可等待对象,是 Task 的父类,表示一个异步操作的 最终结果。通常情况下 没有必要 在应用层级的代码中创建 Future 对象。

运行 asyncio 程序

  • asyncio.run(main, *, debug=False)
    • 创建一个新的事件循环并在结束时将循环关闭。
    • 它应当被用作 asyncio 程序 最高层级的 入口点(main 方法),在理想情况下应当只被调用一次。
    • 当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。
    • 如果 debugTrue,事件循环将以调试模式运行。

注:asyncio.run() 是 Python 3.7 中引入的,也是官方 推荐的 运行协程的方式,相当于老版本的以下语句:

1
2
3
4
5
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(coro)
finally:
loop.close()

创建任务

  • asyncio.create_task(coro, *, name=None)
    • 将传入的协程封装为一个 Task,并自动调度它执行,返回 Task 对象。
    • name 不为 None 时,它将使用 Task.set_name() 来设置任务的名称。

同样,此函数在 Python 3.7 中被引入,相当于低层级的 asyncio.ensure_future() 方法。

休眠

  • asyncio.sleep(delay, result=None, *)
    • 阻塞 delay 指定的秒数。
    • 如果指定了 result,则当协程完成时将其返回给调用者。
    • asyncio.sleep() 总是会挂起当前任务,以允许其他任务运行。
    • 将 delay 设为 0 将提供一个经优化的路径以允许其他任务运行,这可供长期间运行的函数使用以避免在函数调用的全过程中阻塞事件循环。

并发运行任务

  • asyncio.gather(*aws, return_exceptions=False)
    • 并发 运行 aws 序列中的所有可等待对象。
    • 如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。
    • 如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
    • 如果 return_exceptionsFalse,不捕获协程中的异常,错误会完整地 throw 到当前的执行层,但是 aws 序列中的其他可等待对象 不会被取消 并将继续运行(可以在下次 await 的时候执行)。
    • 如果 return_exceptionsTrue,异常会和成功的结果一样处理,并聚合至结果列表。

示例一:协程的异常处理和取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import asyncio
import time


async def worker_1():
await asyncio.sleep(1)
return 1


async def worker_2():
await asyncio.sleep(2)
return 2 / 0 # 发生异常


async def worker_3():
await asyncio.sleep(3)
return 3


async def main():
task_1 = asyncio.create_task(worker_1())
task_2 = asyncio.create_task(worker_2())
task_3 = asyncio.create_task(worker_3())

await asyncio.sleep(2)
task_3.cancel() # 取消任务

res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
print(res) # 输出所有task的执行结果(包括返回值、程序异常)


ts = time.time()
asyncio.run(main()) # [1, ZeroDivisionError('division by zero'), 3]
print('all time: {:.2f}s'.format(time.time() - ts)) # 2.02s

示例二:协程的并发分组运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio


async def coro(tag):
print(">", tag)
await asyncio.sleep(1)
print("<", tag)
return tag


async def main():
group1 = asyncio.gather(*[coro('group 1.{}'.format(i)) for i in range(1, 6)]) # 对任务进行高级别分组
group2 = asyncio.gather(*[coro('group 2.{}'.format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro('group 3.{}'.format(i)) for i in range(1, 10)])

print('start')
group3.cancel() # 分组的任务可以通过调用Task.cancel()来取消
results = await asyncio.gather(group1, group2, group3, return_exceptions=True)
print(results)


asyncio.run(main())

单任务等待

  • asyncio.wait_for(aw, timeout, *)
    • 等待 aw 可等待对象完成,在指定 timeout 秒数后超时。
    • 如果 aw 是一个协程,它将自动被作为任务调度。
    • 如果 timeoutNone,则等待直到完成。
    • 如果发生超时,任务将取消并引发 asyncio.TimeoutError
    • 要避免任务取消,可以加上 asyncio.shield()

示例:单任务超时取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio


async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('hi')


async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout')


asyncio.run(main())

多任务等待

  • asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

    • 并发地运行 aws 可迭代对象中的所有可等待对象,并进入阻塞状态直到满足 return_when 所指定的条件。

    • 返回两个 Task/Future 集合:(done, pending)

    • 此函数不会引发 asyncio.TimeoutError,当超时发生时,未完成的 Task/Future 将在指定秒数后被返回。

    • return_when 指定此函数应在何时返回,它必须为以下常数之一:

      常数 描述
      FIRST_COMPLETED 函数将在任意可等待对象结束或取消时返回
      FIRST_EXCEPTION 函数将在任意可等待对象因引发异常而结束时返回,当没有引发任何异常时它就相当于 ALL_COMPLETED
      ALL_COMPLETED 函数将在 所有 可等待对象结束或取消时返回
    • wait_for() 不同,wait() 在超时发生时不会取消可等待对象。

示例:多任务超时取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio


async def foo():
await asyncio.sleep(1)
return 42


async def bar():
await asyncio.sleep(3600)
return 21


async def main():
t1 = asyncio.create_task(foo())
t2 = asyncio.create_task(bar())

done, pending = await asyncio.wait([t1, t2], timeout=2.0)

if t1 in done:
print(f"The task's result is {t1.result()}")
print(f"The task's status is {t1.done()}")

for p in pending:
p.cancel()
print(f'cancel task: {p.get_name()}')

await asyncio.sleep(1)
print(asyncio.all_tasks())


asyncio.run(main())

gather 和 wait 的区别:asyncio.gather()asyncio.wait() 都可以并发执行多个协程对象或任务,但 gather 主要用于收集结果,它会返回一个列表,按照给定的顺序返回结果;而 wait 会返回一个元组(done, pending),分别表示 已完成任务列表未完成任务列表,列表中的每个任务都是一个 Task 实例,可以通过 task.result() 获得协程返回值。此外,wait 可以指定函数的返回时机和超时时间,是比 gather 更低层级的调用方式。

内省

  • asyncio.current_task(loop=None)

    • 返回当前运行的 Task 实例,如果没有正在运行的任务则返回 None
    • 如果 loopNone 则会使用 get_running_loop() 获取当前事件循环。
  • asyncio.all_tasks(loop=None)

    • 返回事件循环所运行的未完成的 Task 实例的集合。
    • 如果 loopNone 则会使用 get_running_loop() 获取当前事件循环。

流是用于处理网络连接的支持 async/await 的高层级原语。 流允许发送和接收数据,而不需要使用回调或低级协议和传输。

队列

asyncio 队列被设计成与 queue 模块类似,专用于 async/await 代码,但它 不是线程安全 的。asyncio 的队列没有 timeout 形参,使用 asyncio.wait_for() 函数为队列添加超时操作。

使用说明

asyncio.queues 模块提供的类:

  • Queue(maxsize=0, *):先进先出队列
  • LifoQueue(maxsize=0, *):后进先出队列
  • PriorityQueue(maxsize=0, *):优先级队列

asyncio.queues 模块提供的常用方法:

1
2
3
4
5
6
7
8
9
queue.qsize()  # 返回队列的大小(准确值)
queue.empty() # 返回队列是否为空
queue.full() # 返回队列是否为满
queue.put(item) # coroutine方法,向队列中插入一个元素,如果队列为满则阻塞
queue.put_nowait(item) # 立即向队列中插入一个元素,如果队列为满则引发QueueFull异常
queue.get() # coroutine方法,从队列中获取一个元素并返回,如果队列为空则阻塞
queue.get_nowait() # 立即从队列中获取一个元素并返回,如果队列为空则引发QueueEmpty异常
queue.task_done() # 同标准queue模块
queue.join() # coroutine方法,阻塞至队列中所有的元素都被接收和处理完毕

示例:多个并发任务的工作量分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
import random
import time


async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()

# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)

# Notify the queue that the "work item" has been processed.
queue.task_done()

print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()

# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)

# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)

# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at

# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)

print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())

queue.Queue & asyncio.Queue

queue.Queue 和 asyncio.Queue 都是支持多生产者、多消费者的队列,同样是基于 collections.deque 实现,他们都提供了 Queue、LifoQueue、PriorityQueue,所提供的接口也相同。

区别在于 queue.Queue 适用于多线程场景的通信,asyncio.Queue 适用于协程场景的通信。由于 asyncio 的加成,queue.Queue 下的阻塞接口在 asyncio.Queue 中则是以返回协程对象的方式执行,具体差异如下表:

同步原语

Semaphore

asyncio.Semaphore(value=1) 信号量对象,非线程安全。

信号量会管理一个内部计数器,该计数器会随每次 acquire() 调用递减并随每次 release()调用递增。计数器的值永远不会降到零以下;当 acquire() 发现其值为零时,它将保持阻塞直到有某个任务调用了 release()

使用 Semaphore 的推荐方式是通过 async with 语句:

1
2
3
4
5
6
7
8
9
10
11
sem = asyncio.Semaphore(10)
async with sem:
# work with shared resource

---------
# 上述代码等价于:
await sem.acquire()
try:
# work with shared resource
finally:
sem.release()

示例:协程使用 Semaphore 限制并发量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import asyncio


async def hello(sem):
async with sem:
print(asyncio.current_task())
await asyncio.sleep(1)


async def main():
tasks = []
sem = asyncio.Semaphore(10) # 并发执行数为5
for i in range(100): # 启动100个协程
tasks.append(asyncio.create_task(hello(sem)))
await asyncio.gather(*tasks)


asyncio.run(main())

子进程

本节介绍了用于创建和管理子进程的高层级 async/await asyncio API。

复合语句

异步迭代器 async for

async for 语句允许方便地对异步可迭代对象进行迭代。

异步上下文管理器 async with

async with 语句是一种上下文管理器,能够在其 enterexit 方法中暂停执行。

代码示例

示例一:使用协程方法替换普通方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 非协程版本
import time


def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
time.sleep(sleep_time)
print('OK {}'.format(url))


def main(urls):
for url in urls:
crawl_page(url)


ts = time.time()
main(['url_1', 'url_2', 'url_3', 'url_4'])
print('all time: {:.2f}s'.format(time.time() - ts)) # 10.03s

---------
# 协程版本
import asyncio
import time


async def crawl_page(url):
print('crawling {}'.format(url))
sleep_time = int(url.split('_')[-1])
await asyncio.sleep(sleep_time)
print('OK {}'.format(url))


async def main(urls):
tasks = [asyncio.create_task(crawl_page(url)) for url in urls] # 创建task
for task in tasks: # 或把for循环替换为 await asyncio.gather(*tasks) 其中*tasks为解包列表
await task


ts = time.time()
asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
print('all time: {:.2f}s'.format(time.time() - ts)) # 4.01s

示例二:使用协程实现生产者-消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
import random


async def consumer(queue, number):
while True:
val = await queue.get()
print('{} get a val: {}'.format(number, val))
await asyncio.sleep(1)


async def producer(queue, number):
for i in range(5):
val = random.randint(1, 10)
await queue.put(val)
print('{} put a val: {}'.format(number, val))
await asyncio.sleep(1)


async def main():
queue = asyncio.Queue()

consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))

producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
producer_2 = asyncio.create_task(producer(queue, 'producer_2'))

await asyncio.sleep(10) # 执行10s
consumer_1.cancel()
consumer_2.cancel()

await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)


asyncio.run(main())

事件循环

事件循环是每个 asyncio 应用的核心。 事件循环会运行异步任务和回调,执行网络 IO 操作,以及运行子进程。

应用开发者通常应当使用高层级的 asyncio 函数,例如 asyncio.run(),应当很少有必要引用循环对象或调用其方法。 本节所针对的主要是低层级代码、库和框架的编写者,他们需要更细致地控制事件循环行为。

以下低层级函数可被用于获取、设置或创建事件循环:

  • asyncio.``get_running_loop()

    返回当前 OS 线程中正在运行的事件循环。如果没有正在运行的事件循环则会引发 RuntimeError。 此函数只能由协程或回调来调用。3.7 新版功能.

  • asyncio.``get_event_loop()

    获取当前事件循环。如果当前 OS 线程没有设置当前事件循环,该 OS 线程为主线程,并且 set_event_loop() 还没有被调用,则 asyncio 将创建一个新的事件循环并将其设为当前事件循环。由于此函数具有相当复杂的行为(特别是在使用了自定义事件循环策略的时候),更推荐在协程和回调中使用 get_running_loop() 函数而非 get_event_loop()。应该考虑使用 asyncio.run() 函数而非使用低层级函数来手动创建和关闭事件循环。3.10 版后已移除: 如果没有正在运行的事件循环则会发出弃用警告。 在未来的 Python 发行版中,此函数将成为 get_running_loop() 的别名。

  • asyncio.``set_event_loop(loop)

    loop 设置为当前 OS 线程的当前事件循环。

  • asyncio.``new_event_loop()

    创建一个新的事件循环。

aiohttp 库

基于 asyncio 框架实现的异步 HTTP 客户端/服务器,用于替代 requests 库。

代码示例

示例一:简单使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# client
import asyncio

import aiohttp


async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://httpbin.org/get') as response:
print(f'Status: {response.status}')
print(f"Content-type: {response.headers['content-type']}")
html = await response.text()
print(f'Body: {html}')


asyncio.run(main())

---------
# server
from aiohttp import web


async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)


app = web.Application()
app.add_routes([web.get('/', handle), web.get('/{name}', handle)])

if __name__ == '__main__':
web.run_app(app, host='127.0.0.1', port=9643)

示例二:ClientSession 常用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import asyncio

from aiohttp import ClientSession


async def get(url, session: ClientSession):
# 传递参数
params_list = [
{'key1': 'value1', 'key2': 'value2'}, # 字典形式(标准格式,由程序编码)
[('key', 'value1'), ('key', 'value2')], # 元组列表形式(为一个键指定多个值)
'key1=value1&key2=value2' # 字符串形式(不会自动编码)
]
# 自定义headers
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/78.0.3904.108 Safari/537.36"
}
for params in params_list:
# ClientSession.get() 有一个必选的url参数,它必须是一个str或yarl.URL对象
async with session.get(url, params=params, headers=headers) as r:
print(f'url: {r.url}')
print(f'status: {r.status}')
print(f'headers: {r.headers}') # 返回详细的响应头信息
print(f'requestInfo: {r.request_info}') # 返回详细的请求信息,包括url、method、headers

res = await r.text(encoding='utf-8') # 使用response.read()读取二进制内容
print(res)


async def post(url, session: ClientSession):
# 提交数据
data_list = [
'test', # 纯文本格式,text/plain
b'test', # 二进制流格式,application/octet-stream
{'test': 'object'}, # 表单格式,application/x-www-form-urlencoded
]
for data in data_list:
async with session.post(url, data=data) as r:
print(f'url: {r.url}')
print(await r.text())
# json格式,application/json
async with session.post(url, json={'test': 'object'}) as r:
print(f'url: {r.url}')
print(await r.text())


async def cookie(url, session: ClientSession):
async with session.get(url) as r:
print(f'url: {r.url}')
print(await r.json()) # 返回json解码后的内容


async def main():
# 自定义cookies
cookies = {'cookies_are': 'working'}
# 在网络请求中,会话(session)是指同一用户与服务器进行交互的过程,aiohttp使用ClientSession来管理会话。
# 注意:不要为每个请求创建一个会话,最好是为每个应用程序创建一个会话,以执行所有请求。更复杂的情况可能需要对每个站点都有一个会话,例如一个用于Github,另一个用于Facebook。
async with ClientSession(cookies=cookies) as session:
await get('http://httpbin.org/get', session)
await post('http://httpbin.org/post', session)
await cookie('http://httpbin.org/cookies', session)


asyncio.run(main())

示例三:aiohttp 限制并发连接数和超时时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import asyncio
import time

from aiohttp import ClientSession, ClientTimeout, TCPConnector, client_exceptions


async def get():
conn = TCPConnector(limit=0) # 限制同时连接数,默认为100,limit=0时无限制
timeout = ClientTimeout(total=10, connect=2, sock_connect=2, sock_read=10)
# 默认total=5*60, connect=None, sock_connect=None, sock_read=None,为None或0时表示禁用特定的超时检查
# total:单次请求的最长时间(包括建立连接、发送请求和读取响应)
# connect:建立新连接或等待连接池中的空闲连接的最长时间
# sock_connect:建立新连接的最长时间(不是从池中给出的)
# sock_read:从服务端获取消息的最长时间

async with ClientSession(connector=conn, timeout=timeout) as session:
try:
async with session.get('http://127.0.0.1:9643/test') as response:
print(f'Body: {await response.text()}')
except client_exceptions.ServerTimeoutError as e:
print(f'Request timeout: {e}')
finally:
pass


async def main():
st = time.perf_counter()
try:
await asyncio.wait_for(get(), timeout=10) # 协程超时
except asyncio.exceptions.TimeoutError:
print('Coroutines timeout')
finally:
print(f'time: {time.perf_counter() - st}s')


asyncio.run(main())

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!