基本概念
通常我们认为线程是轻量级的进程,因此我们也把协程理解为轻量级的线程。
在 Python 中并发编程一般都是使用多线程或者多进程来实现的,对于 CPU 密集型任务由于 GIL 的存在我们通常使用多进程,而对于 IO 密集型任务我们可以通过线程调度,让线程在执行 IO 任务时让出 GIL,从而实现表面上的并发。
其实对于 IO 密集型任务我们还有另外一种选择,就是 协程(Coroutine) 。协程的作用是在执行函数 A 时可以随时中断去执行函数 B,然后中断函数 B 继续执行函数 A,即可以在任务间自由切换,由用户决定,在哪些地方交出控制权 。这一过程看似像多线程,但其实只有一个线程执行 ,协程就相当于是运行在单线程中的“并发”。
协程有什么优势?
开销少,执行效率极高:因为子程序(函数)切换不是线程切换,而是由程序自身控制,没有线程上下文切换的开销。所以与多线程相比,线程的数量越多,协程性能的优势就越明显。
不需要多线程的锁机制:因为只有一个线程,不存在同时写变量冲突,在协程中控制共享资源不需要加锁,也没有状态同步的开销,因此执行效率比多线程高很多。
方便切换控制流,简化代码逻辑。
协程如何处理CPU密集型任务? 协程可以很好地处理 IO 密集型任务的效率问题,但其本质是个单线程,处理 CPU 密集型任务不是它的长处。如果要充分发挥多核 CPU 的性能,最简单的方法是 多进程+协程 ,既充分利用多核,又充分发挥协程的高效率,可以获得极高的性能。
进程、线程、协程的对比
进程和线程是抢占式的调度(CPU 决定),协程是协同式的调度(代码决定)。
进程 :每个人都领一套工具(环境,上下文)去干活,要干很多工作,人多(核多)就可以做的更快。
线程 :一个人有一套工具后干活,要干很多工作,先干哪个要听老板的(系统),遇见要等待的事情,老板会叫你先干别的。但老板不懂你干的活,比如你挖一个坑埋一个萝卜再盖土,结果他让你挖坑、埋土、放萝卜,这时候就要用锁告诉老板这个必须先放了萝卜才能埋土。
协程 :一个人有一套工具后干活,要干很多工作,先干哪个由自己决定(代码),遇见要等待的事情,自己会先干别的。对于老板来说,协程内的工作他不会干预,比如他会让你吃饭,但不会规定先吃哪个菜。而正是因为省去了老板的工作量,才大大提高了公司的效率。
串行 :一个人有一套工具后干活,要干很多工作,先干哪个完全按写好的清单来,遇见要等待的事情,就等他执行完了再做下一个。
发展历程
Python 2.2 中,第一次引入了生成器,生成器实现了一种惰性、多次取值的方法,此时还是通过next
构造生成迭代链或进行多次取值。
Python 2.5 中,引入了yield
关键字,使得生成器有了记忆功能,下一次从生成器取值时,可以恢复到上次yield
执行的位置。此外,生成器还加入了send
方法与yield
搭配使用,可以用yield
让生成器暂停到一个状态,还可以通过send
方法传入一个值来改变其停止位置的状态。
Python 3.3 中,新增了yield from
关键字,实现了在生成器(委派生成器)内部调用另外生成器(子生成器)的功能,可以轻易的重构生成器,比如将多个生成器连接在一起执行。
Python 3.4 中,新增了 asyncio 库,提供了一个默认的 Event Loop,在语法上支持使用@asyncio.coroutine
和yield from
实现协程,有了足够的基础工具进行异步并发编程。
Python 3.5 中,引入了async
/await
关键字,用以取代@asyncio.coroutine
和yield from
,从而简化了协程的使用并且便于理解。
迭代器和生成器 容器与迭代器 在 Python 中一切皆对象,对象的抽象就是类,而 对象的集合就是容器 ,比如字符串、列表、元组、字典、集合都是容器。不同容器的区别,在于内部数据结构的实现方法。
所有的容器都是 可迭代的(iterable) ,即可迭代对象。这里的迭代,和枚举不完全一样。迭代可以想象成是你去买苹果,卖家并不告诉你他有多少库存。这样,每次你都需要告诉卖家,你要一个苹果,然后卖家采取行为:要么给你拿一个苹果;要么告诉你,苹果已经卖完了。你并不需要知道,卖家在仓库是怎么摆放苹果的。
严谨地说,迭代器(iterator) 提供了一个 _next _ 方法,调用这个方法后,你要么得到这个容器的下一个对象,要么得到一个 StopIteration
异常。对于可迭代对象,通过调用 iter() 函数可以得到一个迭代器,迭代器可以通过 next() 函数来得到下一个元素,从而实现遍历,for...in...
语句就是将这个过程隐式化。
示例:遍历迭代器对象
iterable_list = iter ([1 , 2 ])while True : try : print ('ok' ) print (next (iterable_list)) except StopIteration: print ('empty' ) break
生成器 生成器(generator) 是一种特殊的迭代器,它并不会像迭代器一样占用大量内存。合理使用生成器,可以降低内存占用、优化程序结构、提高程序速度。此外,在 Python 2 的版本中,生成器也是实现l协程的一种重要方式。
示例:生成器与迭代器的性能差异比较
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import osimport psutildef show_memory (hint ): memory = f'{psutil.Process(os.getpid()).memory_full_info().uss / 1024 / 1024 :.4 f} MB' print (f'{hint} memory used: {memory} ' )def test_iterator (): show_memory('init iterator' ) res = [i for i in range (50000000 )] show_memory('after initiated' ) print (f'sum result: {sum (res)} ' ) show_memory('after sum called' ) print ()def test_generator (): show_memory('init generator' ) res = (i for i in range (50000000 )) show_memory('after initiated' ) print (f'sum result: {sum (res)} ' ) show_memory('after sum called' ) print () test_iterator() test_generator()
yield 关键字
yield 应当取其“让步”而非“产出”或“返回”之意,这是理解协程的关键。
yield 有让步之意,因为它交出了程序的控制权,但这个协程并没有结束,下一次执行时,将恢复到之前让出程序控制权的地方,也就是 yield 语句执行的地方继续执行。方法中包含yield
表达式后,Python 会将其视作一个生成器对象,不再是普通的方法。
常用方法说明:
__next__()
方法或next()
: 作用是启动或者恢复 generator 的执行,相当于send(None)
send(value)
方法:作用是发送值给 yield 表达式,启动 generator 会调用send (None)
使用生成器之前需要先调用__next__
或者send(None)
方法,否则将报错。
代码示例 示例一:yield 的简单使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def test (): print ('generator start' ) n = 1 while True : yield_expression_value = yield n print (f'yield_expression_value: {yield_expression_value} ' ) n += 1 test_generator = test()print (type (test_generator)) next_result = next (test_generator) print (f'next_result: {next_result} ' ) send_result = test_generator.send(100 ) print (f'send_result: {send_result} ' ) send_result = next (test_generator) print (f'send_result: {send_result} ' )
示例二:yield 和 send 与外界的交互流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def jump_range (up_to ): step = 0 while step < up_to: jump = yield step print ('jump' , jump) if jump is None : jump = 1 step += jump print ('step' , step) iterator = jump_range(10 )print (next (iterator)) print (iterator.send(4 )) print (next (iterator)) print (iterator.send(-1 ))
示例三:yield 生产者-消费者模型
传统的生产者-消费者模型是一个线程写消息,一个线程读消息,通过锁机制控制队列和等待,但一不小心就可能死锁。如果改用协程,生产者生产消息后,直接通过 yield
跳转到消费者开始执行,待消费者执行完毕后,切换回生产者继续生产,效率极高。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 """示例说明: 1. consumer函数是一个generator 2. 把一个consumer传入produce后,首先调用c.send(None)启动生成器 3. 一旦produce产生了消息,通过c.send(n)切换到consumer执行 4. consumer通过yield拿到消息,进行处理,又通过yield把结果传回 5. produce拿到consumer处理的结果,继续生产下一条消息 6. produce决定不生产了,通过c.close()关闭consumer,整个过程结束 7. 整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务 """ def consumer (): r = '' while True : n = yield r if not n: return print (f'[CONSUMER] Consuming {n} ...' ) r = '200 OK' def produce (c ): c.send(None ) n = 0 while n < 5 : n = n + 1 print (f'[PRODUCER] Producing {n} ...' ) r = c.send(n) print (f'[PRODUCER] Consumer return: {r} ' ) c.close() C = consumer() produce(C)
示例四:yield from 表达式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def gen_n (n ): i = 0 while i < n: yield i i += 1 def main (n ): print ('start' ) yield from gen_n(n) print ('do something' ) yield from gen_n(n) print ('end' )for e in main(3 ): print (e)
asyncio 是用来编写 并发 代码的库,使用 async/await 语法,是实现高性能 Python 异步框架的基础。
可等待对象 如果一个对象可以在 await 语句 中使用,那么它就是 可等待对象(awaitable) 。
对 await 关键字的理解:简单的说,await就是挂起当前任务,去执行其他任务,此时是堵塞的,必须要等其他任务执行完毕才能返回到当前任务继续往下执行,这样的说的前提是,在一个时间循环中有多个 task 或 future,当 await 右面等待的对象是协程对象时,就没有了并发的作用,就是堵塞等待这个协程对象完成。
可等待对象有三种主要类型:协程(Coroutine) 、任务(Task) 、Future 对象 。
协程:Python 协程属于 可等待 对象,因此可以在其他协程中被等待。协程函数的调用不会立即执行,而是会返回一个协程对象。协程对象需要注册到事件循环中,由事件循环调用。
协程函数:定义形式为 async def
的函数 协程对象:调用协程函数所返回的对象
任务:任务 是对协程的进一步封装,其中包含有各种状态,用于 并发 运行多个协程。当一个协程通过 asyncio.create_task()
等函数被封装为一个 任务 ,该协程会被自动调度执行 。
Future 对象:Future 是一种特殊的 低层级 可等待对象,是 Task 的父类,表示一个异步操作的 最终结果 。通常情况下 没有必要 在应用层级的代码中创建 Future 对象。
运行 asyncio 程序
asyncio.run(main, *, debug=False)
创建一个新的事件循环并在结束时将循环关闭。
它应当被用作 asyncio 程序 最高层级的 入口点(main 方法),在理想情况下应当只被调用一次。
当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。
如果 debug 为 True
,事件循环将以调试模式运行。
注:asyncio.run()
是 Python 3.7 中引入的,也是官方 推荐的 运行协程的方式,相当于老版本的以下语句:
1 2 3 4 5 loop = asyncio.get_event_loop()try : loop.run_until_complete(coro)finally : loop.close()
创建任务
asyncio.create_task(coro, *, name=None)
将传入的协程封装为一个 Task,并自动调度它执行,返回 Task 对象。
当 name 不为 None
时,它将使用 Task.set_name()
来设置任务的名称。
同样,此函数在 Python 3.7 中被引入,相当于低层级的 asyncio.ensure_future()
方法。
休眠
asyncio.sleep(delay, result=None, *)
阻塞 delay 指定的秒数。
如果指定了 result ,则当协程完成时将其返回给调用者。
asyncio.sleep()
总是会挂起当前任务,以允许其他任务运行。
将 delay 设为 0 将提供一个经优化的路径以允许其他任务运行,这可供长期间运行的函数使用以避免在函数调用的全过程中阻塞事件循环。
并发运行任务
asyncio.gather(*aws, return_exceptions=False)
并发 运行 aws 序列中的所有可等待对象。
如果 aws 中的某个可等待对象为协程,它将自动被作为一个任务调度。
如果所有可等待对象都成功完成,结果将是一个由所有返回值聚合而成的列表。结果值的顺序与 aws 中可等待对象的顺序一致。
如果 return_exceptions 为 False
,不捕获协程中的异常,错误会完整地 throw 到当前的执行层,但是 aws 序列中的其他可等待对象 不会被取消 并将继续运行(可以在下次 await 的时候执行)。
如果 return_exceptions 为 True
,异常会和成功的结果一样处理,并聚合至结果列表。
示例一:协程的异常处理和取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import asyncioimport timeasync def worker_1 (): await asyncio.sleep(1 ) return 1 async def worker_2 (): await asyncio.sleep(2 ) return 2 / 0 async def worker_3 (): await asyncio.sleep(3 ) return 3 async def main (): task_1 = asyncio.create_task(worker_1()) task_2 = asyncio.create_task(worker_2()) task_3 = asyncio.create_task(worker_3()) await asyncio.sleep(2 ) task_3.cancel() res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True ) print (res) ts = time.time() asyncio.run(main()) print ('all time: {:.2f}s' .format (time.time() - ts))
示例二:协程的并发分组运行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import asyncioasync def coro (tag ): print (">" , tag) await asyncio.sleep(1 ) print ("<" , tag) return tagasync def main (): group1 = asyncio.gather(*[coro('group 1.{}' .format (i)) for i in range (1 , 6 )]) group2 = asyncio.gather(*[coro('group 2.{}' .format (i)) for i in range (1 , 4 )]) group3 = asyncio.gather(*[coro('group 3.{}' .format (i)) for i in range (1 , 10 )]) print ('start' ) group3.cancel() results = await asyncio.gather(group1, group2, group3, return_exceptions=True ) print (results) asyncio.run(main())
单任务等待
asyncio.wait_for(aw, timeout, *)
等待 aw 可等待对象完成,在指定 timeout 秒数后超时。
如果 aw 是一个协程,它将自动被作为任务调度。
如果 timeout 为 None
,则等待直到完成。
如果发生超时,任务将取消并引发 asyncio.TimeoutError
。
要避免任务取消,可以加上 asyncio.shield()
。
示例:单任务超时取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asyncioasync def eternity (): await asyncio.sleep(3600 ) print ('hi' )async def main (): try : await asyncio.wait_for(eternity(), timeout=1.0 ) except asyncio.TimeoutError: print ('timeout' ) asyncio.run(main())
多任务等待
示例:多任务超时取消
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import asyncioasync def foo (): await asyncio.sleep(1 ) return 42 async def bar (): await asyncio.sleep(3600 ) return 21 async def main (): t1 = asyncio.create_task(foo()) t2 = asyncio.create_task(bar()) done, pending = await asyncio.wait([t1, t2], timeout=2.0 ) if t1 in done: print (f"The task's result is {t1.result()} " ) print (f"The task's status is {t1.done()} " ) for p in pending: p.cancel() print (f'cancel task: {p.get_name()} ' ) await asyncio.sleep(1 ) print (asyncio.all_tasks()) asyncio.run(main())
gather 和 wait 的区别:asyncio.gather()
和 asyncio.wait()
都可以并发执行多个协程对象或任务,但 gather 主要用于收集结果,它会返回一个列表,按照给定的顺序返回结果;而 wait 会返回一个元组(done, pending),分别表示 已完成任务列表 和 未完成任务列表 ,列表中的每个任务都是一个 Task 实例,可以通过 task.result()
获得协程返回值。此外,wait 可以指定函数的返回时机和超时时间,是比 gather 更低层级的调用方式。
内省
流是用于处理网络连接的支持 async/await 的高层级原语。 流允许发送和接收数据,而不需要使用回调或低级协议和传输。
asyncio 队列被设计成与 queue
模块类似,专用于 async/await
代码,但它 不是线程安全 的。asyncio 的队列没有 timeout 形参,使用 asyncio.wait_for()
函数为队列添加超时操作。
使用说明
asyncio.queues 模块提供的类:
Queue(maxsize=0, *):先进先出队列
LifoQueue(maxsize=0, *):后进先出队列
PriorityQueue(maxsize=0, *):优先级队列
asyncio.queues 模块提供的常用方法:
1 2 3 4 5 6 7 8 9 queue.qsize() queue.empty() queue.full() queue.put(item) queue.put_nowait(item) queue.get() queue.get_nowait() queue.task_done() queue.join()
示例:多个并发任务的工作量分配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import asyncioimport randomimport timeasync def worker (name, queue ): while True : sleep_for = await queue.get() await asyncio.sleep(sleep_for) queue.task_done() print (f'{name} has slept for {sleep_for:.2 f} seconds' )async def main (): queue = asyncio.Queue() total_sleep_time = 0 for _ in range (20 ): sleep_for = random.uniform(0.05 , 1.0 ) total_sleep_time += sleep_for queue.put_nowait(sleep_for) tasks = [] for i in range (3 ): task = asyncio.create_task(worker(f'worker-{i} ' , queue)) tasks.append(task) started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True ) print ('====' ) print (f'3 workers slept in parallel for {total_slept_for:.2 f} seconds' ) print (f'total expected sleep time: {total_sleep_time:.2 f} seconds' ) asyncio.run(main())
queue.Queue & asyncio.Queue
queue.Queue 和 asyncio.Queue 都是支持多生产者、多消费者的队列,同样是基于 collections.deque 实现,他们都提供了 Queue、LifoQueue、PriorityQueue,所提供的接口也相同。
区别在于 queue.Queue 适用于多线程场景的通信,asyncio.Queue 适用于协程场景的通信。由于 asyncio 的加成,queue.Queue 下的阻塞接口在 asyncio.Queue 中则是以返回协程对象的方式执行,具体差异如下表:
Semaphore
asyncio.Semaphore(value=1)
信号量对象,非线程安全。
信号量会管理一个内部计数器,该计数器会随每次 acquire()
调用递减并随每次 release()
调用递增。计数器的值永远不会降到零以下;当 acquire()
发现其值为零时,它将保持阻塞直到有某个任务调用了 release()
。
使用 Semaphore 的推荐方式是通过 async with
语句:
1 2 3 4 5 6 7 8 9 10 11 sem = asyncio.Semaphore(10 )async with sem: ---------await sem.acquire()try : finally : sem.release()
示例:协程使用 Semaphore 限制并发量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 import asyncioasync def hello (sem ): async with sem: print (asyncio.current_task()) await asyncio.sleep(1 )async def main (): tasks = [] sem = asyncio.Semaphore(10 ) for i in range (100 ): tasks.append(asyncio.create_task(hello(sem))) await asyncio.gather(*tasks) asyncio.run(main())
本节介绍了用于创建和管理子进程的高层级 async/await asyncio API。
异步迭代器 async for
async for
语句允许方便地对异步可迭代对象进行迭代。
异步上下文管理器 async with
async with
语句是一种上下文管理器,能够在其 enter 和 exit 方法中暂停执行。
代码示例 示例一:使用协程方法替换普通方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import timedef crawl_page (url ): print ('crawling {}' .format (url)) sleep_time = int (url.split('_' )[-1 ]) time.sleep(sleep_time) print ('OK {}' .format (url))def main (urls ): for url in urls: crawl_page(url) ts = time.time() main(['url_1' , 'url_2' , 'url_3' , 'url_4' ])print ('all time: {:.2f}s' .format (time.time() - ts)) ---------import asyncioimport timeasync def crawl_page (url ): print ('crawling {}' .format (url)) sleep_time = int (url.split('_' )[-1 ]) await asyncio.sleep(sleep_time) print ('OK {}' .format (url))async def main (urls ): tasks = [asyncio.create_task(crawl_page(url)) for url in urls] for task in tasks: await task ts = time.time() asyncio.run(main(['url_1' , 'url_2' , 'url_3' , 'url_4' ]))print ('all time: {:.2f}s' .format (time.time() - ts))
示例二:使用协程实现生产者-消费者模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import asyncioimport randomasync def consumer (queue, number ): while True : val = await queue.get() print ('{} get a val: {}' .format (number, val)) await asyncio.sleep(1 )async def producer (queue, number ): for i in range (5 ): val = random.randint(1 , 10 ) await queue.put(val) print ('{} put a val: {}' .format (number, val)) await asyncio.sleep(1 )async def main (): queue = asyncio.Queue() consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1' )) consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2' )) producer_1 = asyncio.create_task(producer(queue, 'producer_1' )) producer_2 = asyncio.create_task(producer(queue, 'producer_2' )) await asyncio.sleep(10 ) consumer_1.cancel() consumer_2.cancel() await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True ) asyncio.run(main())
事件循环是每个 asyncio 应用的核心。 事件循环会运行异步任务和回调,执行网络 IO 操作,以及运行子进程。
应用开发者通常应当使用高层级的 asyncio 函数,例如 asyncio.run()
,应当很少有必要引用循环对象或调用其方法。 本节所针对的主要是低层级代码、库和框架的编写者,他们需要更细致地控制事件循环行为。
以下低层级函数可被用于获取、设置或创建事件循环:
asyncio.``get_running_loop
()
返回当前 OS 线程中正在运行的事件循环。如果没有正在运行的事件循环则会引发 RuntimeError
。 此函数只能由协程或回调来调用。3.7 新版功能.
asyncio.``get_event_loop
()
获取当前事件循环。如果当前 OS 线程没有设置当前事件循环,该 OS 线程为主线程,并且 set_event_loop()
还没有被调用,则 asyncio 将创建一个新的事件循环并将其设为当前事件循环。由于此函数具有相当复杂的行为(特别是在使用了自定义事件循环策略的时候),更推荐在协程和回调中使用 get_running_loop()
函数而非 get_event_loop()
。应该考虑使用 asyncio.run()
函数而非使用低层级函数来手动创建和关闭事件循环。3.10 版后已移除: 如果没有正在运行的事件循环则会发出弃用警告。 在未来的 Python 发行版中,此函数将成为 get_running_loop()
的别名。
asyncio.``set_event_loop
(loop )
将 loop 设置为当前 OS 线程的当前事件循环。
asyncio.``new_event_loop
()
创建一个新的事件循环。
基于 asyncio 框架实现的异步 HTTP 客户端/服务器,用于替代 requests 库。
代码示例 示例一:简单使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 import asyncioimport aiohttpasync def main (): async with aiohttp.ClientSession() as session: async with session.get('http://httpbin.org/get' ) as response: print (f'Status: {response.status} ' ) print (f"Content-type: {response.headers['content-type' ]} " ) html = await response.text() print (f'Body: {html} ' ) asyncio.run(main()) ---------from aiohttp import webasync def handle (request ): name = request.match_info.get('name' , "Anonymous" ) text = "Hello, " + name return web.Response(text=text) app = web.Application() app.add_routes([web.get('/' , handle), web.get('/{name}' , handle)])if __name__ == '__main__' : web.run_app(app, host='127.0.0.1' , port=9643 )
示例二:ClientSession 常用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import asynciofrom aiohttp import ClientSessionasync def get (url, session: ClientSession ): params_list = [ {'key1' : 'value1' , 'key2' : 'value2' }, [('key' , 'value1' ), ('key' , 'value2' )], 'key1=value1&key2=value2' ] headers = { "User-Agent" : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/78.0.3904.108 Safari/537.36" } for params in params_list: async with session.get(url, params=params, headers=headers) as r: print (f'url: {r.url} ' ) print (f'status: {r.status} ' ) print (f'headers: {r.headers} ' ) print (f'requestInfo: {r.request_info} ' ) res = await r.text(encoding='utf-8' ) print (res)async def post (url, session: ClientSession ): data_list = [ 'test' , b'test' , {'test' : 'object' }, ] for data in data_list: async with session.post(url, data=data) as r: print (f'url: {r.url} ' ) print (await r.text()) async with session.post(url, json={'test' : 'object' }) as r: print (f'url: {r.url} ' ) print (await r.text())async def cookie (url, session: ClientSession ): async with session.get(url) as r: print (f'url: {r.url} ' ) print (await r.json()) async def main (): cookies = {'cookies_are' : 'working' } async with ClientSession(cookies=cookies) as session: await get('http://httpbin.org/get' , session) await post('http://httpbin.org/post' , session) await cookie('http://httpbin.org/cookies' , session) asyncio.run(main())
示例三:aiohttp 限制并发连接数和超时时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 import asyncioimport timefrom aiohttp import ClientSession, ClientTimeout, TCPConnector, client_exceptionsasync def get (): conn = TCPConnector(limit=0 ) timeout = ClientTimeout(total=10 , connect=2 , sock_connect=2 , sock_read=10 ) async with ClientSession(connector=conn, timeout=timeout) as session: try : async with session.get('http://127.0.0.1:9643/test' ) as response: print (f'Body: {await response.text()} ' ) except client_exceptions.ServerTimeoutError as e: print (f'Request timeout: {e} ' ) finally : pass async def main (): st = time.perf_counter() try : await asyncio.wait_for(get(), timeout=10 ) except asyncio.exceptions.TimeoutError: print ('Coroutines timeout' ) finally : print (f'time: {time.perf_counter() - st} s' ) asyncio.run(main())