300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > 【进阶】 --- 多线程 多进程 异步IO实用例子

【进阶】 --- 多线程 多进程 异步IO实用例子

时间:2023-05-03 08:44:04

相关推荐

【进阶】 --- 多线程 多进程 异步IO实用例子

【进阶】 --- 多线程、多进程、异步IO实用例子:/lu8000/article/details/82315576

python之爬虫_并发(串行、多线程、多进程、异步IO):/fat39/archive//01/13/9044474.html

Python 并发总结,多线程,多进程,异步IO:/junmoxiao/p/11948993.html

asyncio --- 异步 I/O 官方文档:/zh-cn/3.10/library/asyncio.html

关于asyncio异步io并发编程:/p/158641367

支持asyncio 的异步Python库:/aio-libs

知乎专栏:Python爬虫深入详解Python中协程异步IO(asyncio)详解/p/59621713

asyncio:异步I/O、事件循环和并发工具:/sidianok/p/12210857.html

在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。以下代码默认运行环境为 python3。

httpie:HTTPie使用详解:/p/45093545grequests,Requests + Gevent,访问:/kennethreitz/grequestsgevent,一个高并发的网络性能库,访问:/twisted,基于事件驱动的网络引擎框架。访问:/trac/

目录

一、多线程、多进程

1.同步执行

2.多线程执行

3.多线程+回调函数执行

4.多进程执行

5.多进程+回调函数执行

二、异步

1.asyncio 示例 1

asyncio 示例 2

python异步编程之asyncio(百万并发)

学习 python 高并发模块 asynio

2.asyncio + aiohttp

3.asyncio + requests

4.gevent + requests

5.grequests

6.Twisted示例

7.Tornado

8.Twisted更多

9.史上最牛逼的异步 IO 模块

一、多线程、多进程

1. 同步执行

示例 1(同步执行 )

import requestsdef fetch_sync(r_url=None):response = requests.get(r_url, verify=False)return responseif __name__ == '__main__':url_list = ['', '']for url in url_list:fetch_sync(url)

示例 2(同步执行 )

import requestsimport timefrom lxml import etreeurls = ['/Jmilk/article/details/103218919','/stven_king/article/details/103256724','/csdnnews/article/details/103154693','/dg_lee/article/details/103951021','/m0_37907797/article/details/103272967','/zzq900503/article/details/49618605','/weixin_44339238/article/details/103977138','/dengjin4042056/article/details/103930275','/Mind_programmonkey/article/details/103940511','/xufive/article/details/102993570','/weixin_41010294/article/details/104009722','/yunqiinsight/article/details/103137022','/qq_44210563/article/details/102826406',]def get_title(url: str):headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ''(KHTML, like Gecko) Chrome/86.0.4240.183 Safari/537.36'}r = requests.get(url, headers=headers)if 200 == r.status_code:title = etree.HTML(r.content).xpath('//h1[@class="title-article"]/text()')[0]print(title)else:print(f'[status_code:{r.status_code}]:{r.url}')def main():for url in urls:get_title(url)if __name__ == '__main__':start = time.time()main()print(f'cost time: {time.time() - start}s')

使用 httpx 模块的同步调用(httpx 即可同步,也可异步

import timeimport httpxdef make_request(client):resp = client.get('/get')result = resp.json()print(f'status_code : {resp.status_code}')assert 200 == resp.status_codedef main():session = httpx.Client()# 100 次调用for _ in range(100):make_request(session)if __name__ == '__main__':# 开始start = time.time()main()# 结束end = time.time()print(f'同步:发送100次请求,耗时:{end - start}')

2. 多线程执行(线程池)

from concurrent.futures import ThreadPoolExecutorimport requestsdef fetch_sync(r_url):response = requests.get(r_url)return responseurl_list = ['', '']pool = ThreadPoolExecutor(5)for url in url_list:pool.submit(fetch_sync, url)pool.shutdown(wait=True)

3. 多线程 + 回调函数执行

from concurrent.futures import ThreadPoolExecutorimport requestsdef fetch_sync(r_url):response = requests.get(r_url, verify=False)return responsedef callback(future):print(future.result())url_list = ['', '']pool = ThreadPoolExecutor(5)for url in url_list:v = pool.submit(fetch_sync, url)v.add_done_callback(callback)pool.shutdown(wait=True)

4. 多进程执行

import requestsfrom concurrent import futuresdef fetch_sync(r_url):response = requests.get(r_url)return responseif __name__ == '__main__':url_list = ['', '']with futures.ProcessPoolExecutor(5) as executor:res = [executor.submit(fetch_sync, url) for url in url_list]print(res)

示例 :

import requestsfrom concurrent import futuresimport timeimport urllib3urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)def fetch_sync(args):i, r_url = argsprint(f'index : {i}')response = requests.get(r_url, verify=False)time.sleep(2)return response.status_codedef callback(future):print(future.result())if __name__ == '__main__':# url_list = ['', '']url = ''with futures.ProcessPoolExecutor(5) as executor:for index in range(1000):v = executor.submit(fetch_sync, (index, url))v.add_done_callback(callback)pass

5. 多进程 + 回调函数执行

import requestsfrom concurrent import futuresdef fetch_sync(r_url):response = requests.get(r_url, verify=False)return responsedef callback(future):print(future.result())if __name__ == '__main__':url_list = ['', '']with futures.ProcessPoolExecutor(5) as executor:for url in url_list:v = executor.submit(fetch_sync, url)v.add_done_callback(callback)pass

二、异步

对于事件循环可以动态的增加协程事件循环中,而不是在一开始就确定所有需要协程

协程只运行在事件循环中。

默认情况下asyncio.get_event_loop()是一个select模型事件循环

默认的asyncio.get_event_loop()事件循环属于主线程。

参考 python asyncio 协程:/dashoumeixi/article/details/81001681

一般是一个线程一个事件循环,为什么要一个线程一个事件循环?

[ 这个东西不去了解完全不会影响 asyncio 包的学习,如果不懂的话记住一个线程一个事件循环就得了]

如果你要使用多个事件循环 ,创建线程后调用

lp = asyncio.new_event_loop() #创建一个新的事件循环asyncio.set_event_loop(lp) #设置当前线程的事件循环

核心思想: yield from / await 就这2个关键字,运行(驱动)一个协程,同时交出当前函数的控制权,让事件循环执行下个任务。

yield from 的实现原理:yield from实现:/dashoumeixi/article/details/84076812

要搞懂 asyncio 协程,还是先把生成器弄懂,如果对生成器很模糊,比如 yield from 生成器对象,这个看不懂的话,建议先看 :python生成器 yield from:/dashoumeixi/article/details/80936798

有 2 种方式让协程运行起来,协程(生成器)本身是不运行的

1. await / yield from协程,这一组能等待协程完成。2. asyncio.ensure_future / async(协程) ,这一组不需要等待协程完成。

注意:

1. 协程就是生成器的增强版 ( 多了send 与 yield 的接收 ),在 asyncio 中的协程 与 生成器对象不同点:

asyncio协程:函数内部不能使用 yield [如果使用会抛RuntimeError],只能使用 yield from / await,

一般的生成器: yield 或 yield from 2个都能用,至少使用一个。这 2个本来就是一回事,协程只是不能使用 yield 2. 在 asycio 中所有的协程都被自动包装成一个个Task / Future对象,但其本质还是一个生成器,因此可以 yield from / awaitTask/Futrue

基本流程:

1. 定义一个协程 (async def 或者 @asyncio.coroutine 装饰的函数)2. 调用上述函数,获取一个协程对象 【不能使用yield,除非你自己写异步模块,毕竟最终所调用的还是基于yield的生成器函数】。通过asyncio.ensure_futureasyncio.async函数调度协程(这部意味着要开始执行了) ,返回了一个 Task 对象,Task对象 是 Future对象 的 子类,(这步可作可不作,只要是一个协程对象,一旦扔进事件队列中,将自动给你封装成Task对象)3. 获取一个事件循环asyncio.get_event_loop() ,默认此事件循环属于主线程4. 等待事件循环调度协程

后面的例子着重说明了一下 as_completed,附加了源码。 先说明一下:

1. as_completed 每次迭代返回一个协程,2. 这个协程内部从 Queue 中取出先完成的 Future 对象3. 然后我们再 await coroutine

示例 1:

import asyncio"""第一个例子没什么用.注意: 协程 与 生成器 的用法是一样的. 需要调用之后才产生对象. """async def func():print('hi')lp = asyncio.get_event_loop() # 获取事件循环# 放到进事件循环里.注意,func() 而不是func. 需要调用之后才是协程对象.lp.run_until_complete(func())

示例 2:

import asyncio"""用async def (新语法) 定义一个函数,同时返回值asyncio.sleep 模拟IO阻塞情况 ; await 相当于 yield from.await 或者 yield from 交出函数控制权(中断),让事件循环执行下个任务 ,一边等待后面的协程完成"""async def func(i):print('start')await asyncio.sleep(i) # 交出控制权print('done')return ico = func(2) # 产生协程对象print(co)lp = asyncio.get_event_loop()# 获取事件循环task = asyncio.ensure_future(co) # 开始调度lp.run_until_complete(task) # 等待完成print(task.result()) # 获取结果

添加回调

示例 1:

import asyncio"""添加一个回调:add_done_callback"""async def func(i):print('start')await asyncio.sleep(i)return idef call_back(v):print('callback , arg:', v, 'result:', v.result())if __name__ == '__main__':co = func(2) # 产生协程对象lp = asyncio.get_event_loop() # 获取事件循环# task = asyncio.run_coroutine_threadsafe(co) # 开始调度task = asyncio.ensure_future(co) # 开始调度task.add_done_callback(call_back) # 增加回调lp.run_until_complete(task) # 等待print(task.result()) # 获取结果

子协程调用原理图

官方的一个实例如下

从下面的原理图我们可以看到

1 当事件循环处于运行状态的时候,任务Task 处于pending(等待),会把控制权交给委托生成器 print_sum2委托生成器 print_sum 会建立一个双向通道为Task和子生成器,调用子生成器compute并把值传递过去3子生成器compute会通过委托生成器建立的双向通道把自己当前的状态suspending(暂停)传给Task,Task 告诉 loop 它数据还没处理完成4loop 会循环检测 Task ,Task 通过双向通道去看自生成器是否处理完成5 子生成器处理完成后会向委托生成器抛出一个异常和计算的值,并关闭生成器6 委托生成器再把异常抛给任务(Task),把任务关闭7loop 停止循环

call_soon、call_at、call_later、call_soon_threadsafe

call_soon 循环开始检测时,立即执行一个回调函数call_at 循环开始的第几秒s执行call_later 循环开始后10s后执行call_soom_threadsafe 立即执行一个安全的线程

import asyncioimport timedef call_back(str_var, loop):print("success time {}".format(str_var))def stop_loop(str_var, loop):time.sleep(str_var)loop.stop()# call_later, call_atif __name__ == "__main__":event_loop = asyncio.get_event_loop()event_loop.call_soon(call_back, 'loop 循环开始检测立即执行', event_loop)now = event_loop.time() # loop 循环时间event_loop.call_at(now + 2, call_back, 2, event_loop)event_loop.call_at(now + 1, call_back, 1, event_loop)event_loop.call_at(now + 3, call_back, 3, event_loop)event_loop.call_later(6, call_back, "6s后执行", event_loop)# event_loop.call_soon_threadsafe(stop_loop, event_loop)event_loop.run_forever()

不同线程中的事件循环

事件循环中维护了一个队列(FIFO, Queue),通过另一种方式来调用:

import timeimport datetimeimport asyncio"""事件循环中维护了一个FIFO队列通过call_soon 通知事件循环来调度一个函数."""def func(x):print(f'x:{x}, start time:{datetime.datetime.now().replace(microsecond=0)}')time.sleep(x)print(f'func invoked:{x}')loop = asyncio.get_event_loop()loop.call_soon(func, 1) # 调度一个函数loop.call_soon(func, 2)loop.call_soon(func, 3)loop.run_forever() # 阻塞'''x:1, start time:-10-01 15:45:46func invoked:1x:2, start time:-10-01 15:45:47func invoked:2x:3, start time:-10-01 15:45:49func invoked:3'''

可以看到以上操作是同步的。下面通过asyncio.run_coroutine_threadsafe函数可以把上述函数调度变成异步执行:

import timeimport datetimeimport asyncio"""1.首先会调用asyncio.run_coroutine_threadsafe 这个函数.2.之前的普通函数修改成协程对象"""async def func(x):print(f'x:{x}, start time:{datetime.datetime.now().replace(microsecond=0)}')await asyncio.sleep(x)print(f'func invoked:{x}, now:{datetime.datetime.now().replace(microsecond=0)}')loop = asyncio.get_event_loop()co1 = func(1)co2 = func(2)co3 = func(3)asyncio.run_coroutine_threadsafe(co1, loop) # 调度asyncio.run_coroutine_threadsafe(co2, loop)asyncio.run_coroutine_threadsafe(co3, loop)loop.run_forever() # 阻塞'''x:1, start time:-10-01 15:49:32x:2, start time:-10-01 15:49:32x:3, start time:-10-01 15:49:32func invoked:1, now:-10-01 15:49:33func invoked:2, now:-10-01 15:49:34func invoked:3, now:-10-01 15:49:35'''

上面 2 个例子只是告诉你 2 件事情。

1.run_coroutine_threadsafe异步线程安全call_soon同步。2.run_coroutine_threadsafe 这个函数 对应 ensure_future (只能作用于同一线程中)

可以在一个子线程中运行一个事件循环,然后在主线程中动态的添加协程,这样既不阻塞主线程执行其他任务,子线程也可以异步的执行协程。

注意:默认情况下获取的 event_loop 是主线程的,所以要在子线程中使用 event_loop 需要 new_event_loop 。如果在子线程中直接获取 event_loop 会抛异常 。

源代码中的判断:isinstance(threading.current_thread(), threading._MainThread)

示例:

import osimport sysimport queueimport threadingimport timeimport datetimeimport asyncio"""1. call_soon , call_soon_threadsafe 是同步的2. asyncio.run_coroutine_threadsafe(coro, loop) -> 对应 asyncio.ensure_future是在 事件循环中 异步执行。"""# 在子线程中执行一个事件循环 , 注意需要一个新的事件循环def thread_loop(loop: asyncio.AbstractEventLoop):print('线程开启 tid:', threading.currentThread().ident)asyncio.set_event_loop(loop) # 设置一个新的事件循环loop.run_forever() # run_forever 是阻塞函数,所以,子线程不会退出。async def func(x, q):current_time = datetime.datetime.now().replace(microsecond=0)msg = f'func: {x}, time:{current_time}, tid:{threading.currentThread().ident}'print(msg)await asyncio.sleep(x)q.put(x)if __name__ == '__main__':temp_queue = queue.Queue()lp = asyncio.new_event_loop() # 新建一个事件循环, 如果使用默认的, 则不能放入子线程thread_1 = threading.Thread(target=thread_loop, args=(lp,))thread_1.start()co1 = func(2, temp_queue) # 2个协程co2 = func(3, temp_queue)asyncio.run_coroutine_threadsafe(co1, lp) # 开始调度在子线程中的事件循环asyncio.run_coroutine_threadsafe(co2, lp)print(f'开始事件:{datetime.datetime.now().replace(microsecond=0)}')while 1:if temp_queue.empty():print('队列为空,睡1秒继续...')time.sleep(1)continuex = temp_queue.get() # 如果为空,get函数会直接阻塞,不往下执行current_time = datetime.datetime.now().replace(microsecond=0)msg = f'main :{x}, time:{current_time}'print(msg)time.sleep(1)

下面例子中asyncio.ensure_future/async都可以换成asyncio.run_coroutine_threadsafe【在不同线程中的事件循环 】:

ThreadPollExecutor 和 asyncio 完成阻塞 IO 请求

在 asyncio 中集成线程池处理耗时IO

在协程中同步阻塞的写法,但有些时候不得已就是一些同步耗时的接口

可以把线程池集成到asynico模块中

import asynciofrom concurrent import futurestask_list = []loop = asyncio.get_event_loop()executor = futures.ThreadPoolExecutor(3)def get_url(t_url=None):print(t_url)for url in range(20):url = "/goods/{}/".format(url)task = loop.run_in_executor(executor, get_url, url)task_list.append(task)loop.run_until_complete(asyncio.wait(task_list))

示例代码:

# 使用多线程:在 协程 中集成阻塞ioimport asynciofrom concurrent.futures import ThreadPoolExecutorimport socketfrom urllib.parse import urlparsedef get_url(url):# 通过socket请求htmlurl = urlparse(url)host = locpath = url.pathif path == "":path = "/"# 建立socket连接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.setblocking(False)client.connect((host, 80)) # 阻塞不会消耗cpu# 不停的询问连接是否建立好, 需要while循环不停的去检查状态# 做计算任务或者再次发起其他的连接请求client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":import timestart_time = time.time()loop = asyncio.get_event_loop()executor = ThreadPoolExecutor(3)tasks = []for url in range(20):url = "/goods/{}/".format(url)task = loop.run_in_executor(executor, get_url, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))print("last time:{}".format(time.time() - start_time))

不用集成也是可以的,但是要在函数的前面加上 async 使同步变成异步写法

#使用多线程:在携程中集成阻塞ioimport asynciofrom concurrent.futures import ThreadPoolExecutorimport socketfrom urllib.parse import urlparseimport timeasync def get_html(url):#通过socket请求htmlurl = urlparse(url)host = locpath = url.pathif path == "":path = "/"#建立socket连接client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# client.setblocking(False)client.connect((host, 80)) #阻塞不会消耗cpu#不停的询问连接是否建立好, 需要while循环不停的去检查状态#做计算任务或者再次发起其他的连接请求client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))data = b""while True:d = client.recv(1024)if d:data += delse:breakdata = data.decode("utf8")html_data = data.split("\r\n\r\n")[1]print(html_data)client.close()if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()tasks = [get_html("/goods/2/") for i in range(10)]loop.run_until_complete(asyncio.wait(tasks))print(time.time() - start_time)

asyncio 的 同步 和 通信

在多少线程中考虑安全性,需要加锁,在协程中是不需要的

import asynciototal = 0lock = Noneasync def add():global totalfor _ in range(1000):total += 1async def desc():global total, lockfor _ in range(1000):total -= 1if __name__ == '__main__':tasks = [add(), desc()]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))print(total)

在有些情况中,对协程还是需要类似锁的机制

示例:parse_response 和 use_response 有共同调用的代码,get_response、parse_response 去请求的时候 如果 get_response 也去请求,会触发网站的反爬虫机制.

这就需要我们像上诉代码那样加 lock,同时 get_response 和 use_response中都调用了parse_response,我们想在 get_response 中只请求一次,下次用缓存,所以要用到锁

import asyncioimport aiohttpfrom asyncio import Lockcache = {}lock = Lock()async def get_response(url):async with lock: # 等价于 with await lock: 还有async for 。。。类似的用法# 这里使用async with 是因为 Lock中有__await__ 和 __aenter__两个魔法方法# 和线程一样, 这里也可以用 await lock.acquire() 并在结束时 lock.releaseif url in cache:return cache[url]print("第一次请求")response = aiohttp.request('GET', url)cache[url] = responsereturn responseasync def parse_response(url):response = await get_response(url)print('parse_response', response)# do some parseasync def use_response(url):response = await get_response(url)print('use_response', response)# use response to do something interestingif __name__ == '__main__':tasks = [parse_response('baidu'), use_response('baidu')]loop = asyncio.get_event_loop()# loop.run_until_complete将task放到loop中,进行事件循环, 这里必须传入的是一个listloop.run_until_complete(asyncio.wait(tasks))

输出结果如下

asyncio 通信 queue

协程是单线程的,所以协程中完全可以使用全局变量实现 queue 来相互通信,但是如果想要在 queue 中定义存放有限的最大数目,需要在 put 和 get 的前面都要加 await

from asyncio import Queuequeue = Queue(maxsize=3)await queue.get()await queue.put()

一个事件循环中执行多个 task,实现并发执行

futuretask

future是一个结果的容器,结果执行完后在内部会回调 call_back 函数taskfuture的子类,可以用来激活协程。(task协程Future桥梁

waitgather、await

1.waitgather这2个函数都是用于获取结果的,且都不阻塞,直接返回一个生成器对象可用于 yield from / await

2. 两种用法可以获取执行完成后的结果:

第一种:result = asyncio.run_until_completed(asyncio.wait/gather)执行完成所有之后获取结果

第二种:result = awaitasyncio.wait/gather在一个协程内获取结果

3. as_completed 与并发包 concurrent 中的行为类似,哪个任务先完成哪个先返回,内部实现是 yield from Queue.get()

4. 嵌套:await / yield from 后跟协程,直到后面的协程运行完毕,才执行await/yield from下面的代码整个过程是不阻塞的

wait 和 gather 区别

这两个都可以添加多个任务到事件循环中

一般使用 asyncio.wait(tasks) 的地方也可以使用 asyncio.gather(tasks) ,但是wait接收一堆 task,gather接收一个 task 列表。

asyncio.wait(tasks)方法返回值是两组 task/future的 set。dones, pendings = await asyncio.wait(tasks)其中

dones 是 task的 set,pendings 是 future 的 set。

asyncio.gather(tasks) 返回一个结果的 list。

gather 比 wait 更加的高级

可以对任务进行分组可以取消任务

import asyncioimport timeasync def get_html(url):global indexprint(f"{index} start get url")await asyncio.sleep(2)index += 1print(f"{index} end get url")if __name__ == "__main__":start_time = time.time()index = 1loop = asyncio.get_event_loop()tasks = [get_html("") for i in range(10)]# gather和wait的区别# tasks = [get_html("") for i in range(10)]# loop.run_until_complete(asyncio.wait(tasks))group1 = [get_html("") for i in range(2)]group2 = [get_html("") for i in range(2)]group1 = asyncio.gather(*group1)group2 = asyncio.gather(*group2)loop.run_until_complete(asyncio.gather(group1, group2))print(time.time() - start_time)

asf

示例 1:

import asyncio"""并发 执行多个任务。调度一个Task对象列表调用 asyncio.wait 或者 asyncio.gather 获取结果"""async def func(i):print('start')# 交出控制权,事件循环执行下个任务,同时等待完成await asyncio.sleep(i)return iasync def func_sleep():await asyncio.sleep(2)def test_1():# asyncio create_task永远运行# /blog/article/160584/ca5dc07f62899cedad64/lp = asyncio.get_event_loop()tasks = [lp.create_task(func(i)) for i in range(3)]lp.run_until_complete(func_sleep())# 或者# lp.run_until_complete(asyncio.wait([func_sleep(), ]))def test_2(): lp = asyncio.get_event_loop()# tasks = [func(i) for i in range(3)]# tasks = [asyncio.ensure_future(func(i)) for i in range(3)] # asyncio.ensure_future# 或者tasks = [lp.create_task(func(i)) for i in range(3)] # lp.create_tasklp.run_until_complete(asyncio.wait(tasks))for task in tasks:print(task.result())if __name__ == '__main__':# test_1()test_2()pass

示例 2:

import asyncio"""通过 await 或者 yield from 形成1个链, 后面跟其他协程. 形成一个链的目的很简单,当前协程需要这个结果才能继续执行下去.就跟普通函数调用其他函数获取结果一样"""async def func(i):print('start')await asyncio.sleep(i)return iasync def to_do():print('to_do start')tasks = []# 开始调度3个协程对象for i in range(3):tasks.append(asyncio.ensure_future(func(i)))# 在协程内等待结果. 通过 await 来交出控制权, 同时等待tasks完成task_done, task_pending = await asyncio.wait(tasks)print('to_do get result')# 获取已经完成的任务for task in task_done:print('task_done:', task.result())# 未完成的for task in task_pending:print('pending:', task)if __name__ == '__main__':lp = asyncio.get_event_loop() # 获取事件循环lp.run_until_complete(to_do()) # 把协程对象放进去# lp.close() # 关闭事件循环

as_completed 函数返回一个迭代器,每次迭代一个协程。

事件循环内部有一个 Queue(queue.Queue 线程安全) ,先完成的先入队。

as_completed 迭代的协程源码是 : 注意 yield from 后面可以跟 iterable

#简化版代码f = yield from done.get() # done 是 Queuereturn f.result()

例子:

asyncio.as_completed 返回一个生成器对象 , 因此可用于迭代

每次从此生成器中返回的对象是一个个协程(生成器),哪个最先完成哪个就返回,而要从 生成器/协程 中获取返回值,就必须使用 yield from / await , 简单来说就是:生成器的返回值在异常中,详情参考最上面的基础链接

import asyncioasync def func(x):# print('\t\tstart ',x)await asyncio.sleep(5)# print('\t\tdone ', x)return xasync def to_do():# 在协程内调度2个协程tasks = [asyncio.ensure_future(func(i)) for i in range(2)]# 使用as_completed:先完成,先返回.# 每次迭代返回一个协程.# 这个协程:_wait_for_one,内部从队列中产出一个最先完成的Future对象for coroutine in asyncio.as_completed(tasks):result = await coroutine # 等待协程,并返回先完成的协程print('result :', result)print('all done')lp = asyncio.get_event_loop()lp.set_debug(True)lp.run_until_complete(to_do()) # 调度协程

获取多个并发的 task 的结果。

task协程Future桥梁

如果我们要获取 task 的结果,一定要创建一个task,就是把我们的协程绑定要 task 上,这里直接用事件循环 loop 里面的 create_task 就可以搞定。

我们假设有3个并发的add任务需要处理,然后调用 run_until_complete 来等待3个并发任务完成。

调用 task.result 查看结果,这里的 task 其实是 _asyncio.Task,是封装的一个类。大家可以在 Pycharm 中找 asyncio 里面的源码,里面有一个 tasks 文件。

爬取有道词典

玩并发比较多的是爬虫,爬虫可以用多线程,线程池去爬。但是我们用 requests 的时候是阻塞的,无法并发。所以我们要用一个更牛逼的库 aiohttp,这个库可以当成是异步的 requests。

1). 爬取有道词典

有道翻译的API已经做好了,我们可以直接调用爬取。然后解析网页,获取单词的翻译。然后解析网页,网页比较简单,可以有很多方法解析。因为爬虫文章已经泛滥了,我这里就不展开了,很容易就可以获取单词的解释。

2). 代码的核心框架

设计一个异步的框架,生成一个事件循环

创建一个专门去爬取网页的协程,利用aiohttp去爬取网站内容

生成多个要翻译的单词的url地址,组建一个异步的tasks, 扔到事件循环里面

等待所有的页面爬取完毕,然后用pyquery去一一解析网页,获取单词的解释,部分代码如下:

import timeimport asyncioimport aiohttpfrom pyquery import PyQuery as pqdef decode_html(html_content=None):url, resp_text = html_contentdoc = pq(resp_text)des = ''for li in doc.items('#phrsListTab .trans-container ul li'):des += li.text()return url, desasync def fetch(session: aiohttp.ClientSession = None, url=None):async with session.get(url=url) as resp:resp_text = await resp.text()return url, resp_textasync def main(word_list=None):url_list = ['/w/{}'.format(word) for word in word_list]temp_task_list = []async with aiohttp.ClientSession() as session:for url in url_list:temp_task_list.append(fetch(session, url))html_list = await asyncio.gather(*temp_task_list)for html_content in html_list:print(decode_html(html_content))if __name__ == '__main__':start_time = time.time()text = 'apple'word_list_1 = [ch for ch in text]word_list_2 = [text for _ in range(100)]loop = asyncio.get_event_loop()task_list = [main(word_list_1),main(word_list_2),]loop.run_until_complete(asyncio.wait(task_list))print(time.time() - start_time)

谈到 http 接口调用,Requests 大家并不陌生,例如,robotframework-requests、HttpRunner 等 HTTP 接口测试库/框架都是基于它开发。

这里将介绍另一款http接口测试框架 httpx,snaic 同样也集成了 httpx 库。httpx 的 API 和 Requests 高度一致。github:/encode/httpx

安装:pip install httpx

httpx 简单使用

import jsonimport httpxr = httpx.get("/get")print(r.status_code)print(json.dumps(r.json(), ensure_ascii=False, indent=4))

带参数的 post 调用

import jsonimport httpxpayload = {'key1': 'value1', 'key2': 'value2'}r = httpx.post("/post", data=payload)print(r.status_code)print(json.dumps(r.json(), ensure_ascii=False, indent=4))

httpx 异步调用。接下来认识 httpx 的异步调用:

import jsonimport httpximport asyncioasync def main():async with httpx.AsyncClient() as client:resp = await client.get('/get')result = resp.json()print(json.dumps(result, ensure_ascii=False, indent=4))asyncio.run(main())

httpx 异步调用

import httpximport asyncioimport timeasync def request(client):global indexresp = await client.get('/get')index += 1result = resp.json()print(f'{index} status_code : {resp.status_code}')assert 200 == resp.status_codeasync def main():async with httpx.AsyncClient() as client:# 50 次调用task_list = []for _ in range(50):req = request(client)task = asyncio.create_task(req)task_list.append(task)await asyncio.gather(*task_list)if __name__ == "__main__":index = 0# 开始start = time.time()asyncio.run(main())# 结束end = time.time()print(f'异步:发送 50 次请求,耗时:{end - start}')

想搞懂 异步框架 和异步接口的调用,可以按这个路线学习:1.python异步编程、2.python Web异步框架(tornado/sanic)、3.异步接口调用(aiohttp/httpx)

1. asyncio

示例 1:( Python 3.5+ 之前的写法)

import asyncio@asyncio.coroutinedef func1():print('before...func1......')yield from asyncio.sleep(5)print('end...func1......')tasks = [func1(), func1()]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(*tasks))loop.close()

改进,使用 async / await 关键字 (Python 3.5+ 开始引入了新的语法async和await )

import asyncioasync def func1():print('before...func1......')await asyncio.sleep(5)print('end...func1......')tasks = [func1(), func1()]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.gather(*tasks))loop.close()

示例 2 :

import asyncioasync def fetch_async(host, url='/'):print(host, url)reader, writer = await asyncio.open_connection(host, 80)request_header_content = """GET %s HTTP/1.0\r\nHost: %s\r\n\r\n""" % (url, host,)request_header_content = bytes(request_header_content, encoding='utf-8')writer.write(request_header_content)await writer.drain()text = await reader.read()print(host, url, text)writer.close()tasks = [fetch_async('', '/wupeiqi/'),fetch_async('', '/pic/show?nid=4073644713430508&lid=10273091')]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()

示例 3:

#!/usr/bin/env python# encoding:utf-8import asyncioimport aiohttpimport timeasync def download(url): # 通过async def定义的函数是原生的协程对象print("get: %s" % url)async with aiohttp.ClientSession() as session:async with session.get(url) as resp:print(resp.status)# response = await resp.read()async def main():start = time.time()await asyncio.wait([download(""),download(""),download("")])end = time.time()print("Complete in {} seconds".format(end - start))loop = asyncio.get_event_loop()loop.run_until_complete(main())

Python 异步编程之 asyncio(百万并发)

前言:python 由于 GIL(全局锁)的存在,不能发挥多核的优势,其性能一直饱受诟病。然而在 IO 密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率,弥补了 python 性能方面的短板,如最新的微服务框架 japronto,resquests per second 可达百万级。

python 还有一个优势是库(第三方库)极为丰富,运用十分方便。asyncio 是 python3.4 版本引入到标准库,python2x 没有加这个库,毕竟 python3x 才是未来啊,哈哈!python3.5 又加入了 async/await 特性。在学习 asyncio 之前,我们先来理清楚同步/异步的概念

同步是指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行。。。异步是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果。

aiohttp 使用

如果需要并发 http 请求怎么办呢,通常是用 requests,但 requests 是同步的库,如果想异步的话需要引入 aiohttp。这里引入一个类,from aiohttp import ClientSession,首先要建立一个 session 对象,然后用 session 对象去打开网页。session 可以进行多项操作,比如post、get、put、head等。

基本用法:

async with ClientSession() as session:async with session.get(url) as response:

aiohttp 异步实现的例子:

import asynciofrom aiohttp import ClientSessiontasks = []url = "/get?args=hello_word"async def hello(t_url):async with ClientSession() as session:async with session.get(t_url) as req:response = await req.read()# response = await req.text()# print(response)print(f'{req.url} : {req.status}')if __name__ == '__main__':loop = asyncio.get_event_loop()loop.run_until_complete(hello(url))

首先 async def 关键字定义了这是个异步函数,await 关键字加在需要等待的操作前面,req.read() 等待 request 响应,是个耗 IO 操作。然后使用 ClientSession 类发起 http 请求。

异步请求多个URL

如果我们需要请求多个 URL 该怎么办呢?

同步的做法:访问多个 URL时,只需要加个 for 循环就可以了。但异步的实现方式并没那么容易:在之前的基础上需要将 hello()包装在 asyncio 的 Future 对象中,然后将 Future对象列表 作为 任务 传递给 事件循环

import datetimeimport asynciofrom aiohttp import ClientSessiontask_list = []url = "/{}"async def hello(t_url):ret_val = Noneasync with ClientSession() as session:async with session.get(t_url) as req:response = await req.read()print(f'Hello World:{datetime.datetime.now().replace(microsecond=0)}')# print(response)ret_val = req.statusreturn ret_valdef run():for i in range(5):one_task = asyncio.ensure_future(hello(url.format(i)))task_list.append(one_task)if __name__ == '__main__':loop = asyncio.get_event_loop()run()result = loop.run_until_complete(asyncio.wait(task_list))# 方法 1 : 获取结果for task in task_list:print(task.result())# 方法 2 : 获取结果finish_task, pending_task = resultprint(f'finish_task count:{len(pending_task)}')for task in finish_task:print(task.result())print(f'pending_task count:{len(pending_task)}')for task in pending_task:print(task.result())'''Hello World:-12-06 16:29:02Hello World:-12-06 16:29:02Hello World:-12-06 16:29:02Hello World:-12-06 16:29:02Hello World:-12-06 16:29:02404404404404404finish_task count:0404404404404404pending_task count:0'''

收集 http 响应

上面介绍了访问不同 URL 的异步实现方式,但是我们只是发出了请求,如果要把响应一一收集到一个列表中,最后保存到本地或者打印出来要怎么实现呢?

可通过 asyncio.gather(*tasks) 将响应全部收集起来,具体通过下面实例来演示。

import timeimport asynciofrom aiohttp import ClientSessiontask_list = []temp_url = "/{}"async def hello(url=None):async with ClientSession() as session:async with session.get(url) as request:# print(request)print('Hello World:%s' % time.time())return await request.read()def run():for i in range(5):task = asyncio.ensure_future(hello(temp_url.format(i)))task_list.append(task)result = loop.run_until_complete(asyncio.gather(*task_list))print(f'len(result) : {len(result)}')for item in result:print(item)if __name__ == '__main__':loop = asyncio.get_event_loop()run()

限制并发数(最大文件描述符的限制)

提示:此方法也可用来作为异步爬虫的限速方法(反反爬)

假如你的并发达到 2000 个,程序会报错:ValueError: too many file descriptors in select()。报错的原因字面上看是 Python 调取的 select 对打开的文件有最大数量的限制,这个其实是操作系统的限制,linux 打开文件的最大数默认是 1024,windows 默认是 509,超过了这个值,程序就开始报错。这里我们有三种方法解决这个问题:

1. 限制并发数量。(一次不要塞那么多任务,或者限制最大并发数量)2. 使用回调的方式3. 修改操作系统打开文件数的最大限制,在系统里有个配置文件可以修改默认值,具体步骤不再说明了。

不修改系统默认配置的话,个人推荐限制并发数的方法,设置并发数为 500,处理速度更快。

使用 semaphore = asyncio.Semaphore(500) 以及在协程中使用 async with semaphore: 操作具体代码如下:

# coding:utf-8import time, asyncio, aiohttpurl = '/'index = 0async def hello(url, semaphore):global indexasync with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as response:print(f'{index} : ', end='')await asyncio.sleep(2)print(response.status)return await response.read()async def run():# 为了看效果,这是设置 100 个任务,并发限制为 5semaphore = asyncio.Semaphore(5) # 限制并发量为500to_get = [hello(url.format(), semaphore) for _ in range(100)] # 总共1000任务await asyncio.wait(to_get)if __name__ == '__main__':# now=lambda :time.time()loop = asyncio.get_event_loop()loop.run_until_complete(run())# loop.close()

示例代码:

import asyncioimport aiohttpasync def get_http(url):async with semaphore:async with aiohttp.ClientSession() as session:async with session.get(url) as res:global countcount += 1print(count, res.status)if __name__ == '__main__':count = 0semaphore = asyncio.Semaphore(500)loop = asyncio.get_event_loop()temp_url = '/s?wd={0}'tasks = [get_http(temp_url.format(i)) for i in range(600)]loop.run_until_complete(asyncio.wait(tasks))loop.close()

示例代码:

from aiohttp import ClientSessionimport asyncio# 限制协程并发量async def hello(sem, num):async with sem:async with ClientSession() as session:async with session.get(f'/get?a={num}') as response:r = await response.read()print(f'[{num}]:{r}')await asyncio.sleep(1)def main():loop = asyncio.get_event_loop()tasks = []sem = asyncio.Semaphore(5) # thisfor index in range(100000):task = asyncio.ensure_future(hello(sem, index))tasks.append(task)feature = asyncio.ensure_future(asyncio.gather(*tasks))try:loop.run_until_complete(feature)finally:loop.close()if __name__ == "__main__":main()

aiohttp 实现高并发爬虫 ( 异步 mysql)

python asyncio并发编程:/crazymagic/articles/10066619.html

# asyncio爬虫, 去重, 入库import asyncioimport reimport aiohttpimport aiomysqlfrom pyquery import PyQuerystopping = Falsestart_url = ''waiting_urls = []seen_urls = set() # 实际使用爬虫去重时,数量过多,需要使用布隆过滤器async def fetch(url, session):async with aiohttp.ClientSession() as session:try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:data = await resp.text()return dataexcept Exception as e:print(e)def extract_urls(html): # html中提取所有urlurls = []pq = PyQuery(html)for link in pq.items('a'):url = link.attr('href')if url and url.startwith('http') and url not in seen_urls:urls.append(url)waiting_urls.append(urls)return urlsasync def init_urls(url, session):html = await fetch(url, session)seen_urls.add(url)extract_urls(html)async def article_handler(url, session, pool): # 获取文章详情并解析入库html = await fetch(url, session)extract_urls(html)pq = PyQuery(html)title = pq('title').text() # 为了简单, 只获取title的内容async with pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute('SELECT 42;')insert_sql = "insert into article_test(title) values('{}')".format(title)await cur.execute(insert_sql) # 插入数据库# print(cur.description)# (r,) = await cur.fetchone()# assert r == 42async def consumer(pool):async with aiohttp.ClientSession() as session:while not stopping:if len(waiting_urls) == 0: # 如果使用asyncio.Queue的话, 不需要我们来处理这些逻辑。await asyncio.sleep(0.5)continueurl = waiting_urls.pop()print('start get url:{}'.format(url))if re.match(r'http://.*?/\d+/', url):if url not in seen_urls: # 是没有处理过的url,则处理asyncio.ensure_future(article_handler(url, sssion, pool))else:if url not in seen_urls:asyncio.ensure_future(init_urls(url))async def main(loop):# 等待mysql连接建立好pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='',db='aiomysql_test', loop=loop, charset='utf8', autocommit=True)# charset autocommit必须设置, 这是坑, 不写数据库写入不了中文数据async with aiohttp.ClientSession() as session:html = await fetch(start_url, session)seen_urls.add(start_url)extract_urls(html)asyncio.ensure_future(consumer(pool))if __name__ == '__main__':event_loop = asyncio.get_event_loop()asyncio.ensure_future(main(event_loop))event_loop.run_forever()

学习 python 高并发模块 asynio

参考:Python黑魔法 --- 异步IO( asyncio) 协程:/p/b5e347b3a17c

Python 中重要的模块 --- asyncio:/zhaof/p/8490045.html

Python 协程深入理解:/zhaof/p/7631851.html

asyncio 是 python 用于解决异步io编程的一整套解决方案

创建一个 asyncio 的步骤如下

创建一个 event_loop 事件循环,当启动时,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数。创建协程: 使用 async 关键字定义的函数就是一个协程对象。在协程函数内部可以使用 await 关键字用于阻塞操作的挂起。将协程注册到事件循环中。协程的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

一、定义一个协程

import timeimport asyncioasync def do_some_work(x):print("waiting:", x)start = time.time()# 这里是一个协程对象,这个时候do_some_work函数并没有执行coroutine = do_some_work(2)print(coroutine)# 创建一个事件looploop = asyncio.get_event_loop()# 将协程注册到事件循环,并启动事件循环loop.run_until_complete(coroutine)print("Time:", time.time() - start)

二、创建一个 task

一个协程对象就是 一个原生可以挂起的函数任务则是对协程进一步封装,其中包含了任务的各种状态。

在上面的代码中,在注册事件循环的时候,其实是 run_until_complete 方法将协程包装成为了一个任务(task)对象。task 对象是 Future类的子类,保存了协程运行后的状态,用于未来获取协程的结果

import asyncioimport timestart = lambda: time.time()async def do_some_work(x):print("waiting:", x)start = start()coroutine = do_some_work(2)loop = asyncio.get_event_loop()task = loop.create_task(coroutine)print(task)loop.run_until_complete(task)print(task)print("Time:", time.time() - start)

loop.create_task,asyncio.async / asyncio.ensure_future 和Task有什么区别?

BaseEventLoop.create_task(coro) 、asyncio.async(coro)、Task(coro)安排协同程序执行,这似乎也可以正常工作。那么,所有这些之间有什么区别?

在 Python> = 3.5中,已将 async 设为关键字,所以asyncio.async必须替换为asyncio.ensure_futurecreate_task 的存在理由:

第三方事件循环可以使用其自己的Task子类来实现互操作性。在这种情况下,结果类型是Task的子类。

这也意味着您不应直接创建 Task ,因为不同的事件循环可能具有不同的创建"任务"的方式。另一个重要区别是,除了接受协程外,ensure_future也接受任何等待的对象;而create_task只接受协程。

那么用ensure_future还是create_task函数声明对比:

asyncio.ensure_future(coro_or_future, *, loop=None)BaseEventLoop.create_task(coro)

显然,ensure_future 除了接受 coroutine 作为参数,还接受 future 作为参数。

看 ensure_future 的代码,会发现 ensure_future 内部在某些条件下会调用 create_task,综上所述:

encure_future: 最高层的函数,推荐使用!create_task: 在确定参数是 coroutine 的情况下可以使用。Task: 可能很多时候也可以工作,但真的没有使用的理由!

为了 interoperability,第三方的事件循环可以使用自己的 Task 子类。这种情况下,返回结果的类型是 Task 的子类。

所以,不要直接创建 Task 实例,应该使用 ensure_future() 函数或 BaseEventLoop.create_task() 方法。

asyncio.ensure_futureBaseEventLoop.create_task对比简单的协同程序

From:/blog/article/55686/127e90ac54b5ed388d52/

看过几个关于asyncio的基本Python 3.5教程,它们以各种方式执行相同的操作。在这段代码中:

import asyncioasync def doit(i):print("Start %d" % i)await asyncio.sleep(3)print("End %d" % i)return iif __name__ == '__main__':loop = asyncio.get_event_loop()# futures = [asyncio.ensure_future(doit(i), loop=loop) for i in range(10)]# futures = [loop.create_task(doit(i)) for i in range(10)]futures = [doit(i) for i in range(10)]result = loop.run_until_complete(asyncio.gather(*futures))print(result)

上面定义futures变量的所有三个变体都实现了相同的结果,那么他们有什么区别?有些情况下我不能只使用最简单的变体(协程的简单列表)吗?

asyncio.create_task(coro)asyncio.ensure_future(obj)

从 Python 3.7 开始,为此目的添加了asyncio.create_task(coro)高级功能,可以使用它来代替从 coroutimes 创建任务的其他方法。

但是,如果需要从任意等待创建任务,应该使用asyncio.ensure_future(obj)

推荐:使用asyncio.ensure_future(obj) 来代替 asyncio.create_task(coro)

ensure_futureVScreate_task

ensure_future是创建一个方法Taskcoroutine。它基于参数以不同的方式创建任务(包括使用create_task协同程序和类似未来的对象)。

create_task是一种抽象的方法AbstractEventLoop。不同的事件循环可以不同的方式实现此功能。

您应该使用ensure_future创建任务。create_task只有在你要实现自己的事件循环类型时才需要。

“当从协程创建任务时,你应该使用适当命名的loop.create_task()

在任务中包装协程 - 是一种在后台启动此协程的方法。这是一个例子:

import asyncioasync def msg(text):await asyncio.sleep(0.1)print(text)async def long_operation():print('long_operation started')await asyncio.sleep(3)print('long_operation finished')async def main():await msg('first')# Now you want to start long_operation, but you don't want to wait it finised:# long_operation should be started, but second msg should be printed immediately.# Create task to do so:task = asyncio.ensure_future(long_operation())await msg('second')# Now, when you want, you can await task finised:await taskif __name__ == "__main__":loop = asyncio.get_event_loop()loop.run_until_complete(main())'''输出:firstlong_operation startedsecondlong_operation finished'''

创建任务:

可以通过loop.create_task(coroutine) 创建 task,也可以通过asyncio.ensure_future(coroutine) 创建 task。

使用这两种方式的区别在官网/zh-cn/3/library/asyncio-task.html#asyncio.ensure_future)上有提及。

task / future 以及使用 async 创建的都是 awaitable 对象,都可以在 await 关键字之后使用。

future 对象意味着在未来返回结果,可以搭配回调函数使用

要真正运行一个协程,asyncio 提供了三种主要机制

(/zh-cn/3/library/asyncio-task.html#asyncio.ensure_future)

1.asyncio.run()函数用来运行最高层级的入口点 "main()" 函数。

import asyncioasync def main():print('hello')await asyncio.sleep(1)print('world')asyncio.run(main())

2. 使用 await ( 即等待一个协程

import asyncioimport timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():print(f"started at {time.strftime('%X')}")await say_after(3, 'hello')await say_after(1, 'world')print(f"finished at {time.strftime('%X')}")asyncio.run(main())

预期的输出:

started at 17:13:52worldhellofinished at 17:13:55

3. 使用asyncio.create_task()函数用来并发运行作为 asyncio任务的多个协程。

import asyncioimport timeasync def say_after(delay, what):await asyncio.sleep(delay)print(what)async def main():task1 = asyncio.create_task(say_after(3, 'hello'))task2 = asyncio.create_task(say_after(1, 'world'))print(f"started at {time.strftime('%X')}")# Wait until both tasks are completed (should take# around 2 seconds.)await task1await task2asyncio.run(main())

预期的输出:

started at 17:14:32worldhellofinished at 17:14:34

获取协程的返回值

1 创建一个任务 task2 通过调用 task.result 获取协程的返回值

import asyncioimport timeasync def get_html(url):print("start get url")await asyncio.sleep(2)return "this is test"if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()task = loop.create_task(get_html(""))loop.run_until_complete(task)print(task.result())

三、绑定回调

执行成功进行回调处理

可以通过 add_done_callback( 任务) 添加回调,因为这个函数只接受一个回调的函数名,不能传参,我们想要传参可以使用偏函数

# 获取协程的返回值import asyncioimport timefrom functools import partialasync def get_html(url):print("start get url")await asyncio.sleep(2)return "this is test"def callback(url, future):print(url)print("send email to bobby")if __name__ == "__main__":start_time = time.time()loop = asyncio.get_event_loop()task = loop.create_task(get_html(""))task.add_done_callback(partial(callback, ""))loop.run_until_complete(task)print(task.result())

asnycio 异步请求+异步回调/u013917468/article/details/104609908

当使用 ensure_feature 创建任务的时候,可以使用任务的 task.add_done_callback(callback)方法,获得对象的协程返回值。

import asyncioimport timeasync def do_some_work(x):print("waiting:", x)return "Done after {}s".format(x)def callback(future):print("callback:", future.result())start = time.time()coroutine = do_some_work(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)print(task)task.add_done_callback(callback)print(task)loop.run_until_complete(task)

四、阻塞 (使用await 让出控制权,挂起当前操作 )

前面提到 asynic 函数内部可以使用 await 来针对耗时的操作进行挂起。

import asyncioimport timeasync def do_some_work(x):print("waiting:", x)# await 后面就是调用耗时的操作await asyncio.sleep(x)return "Done after {}s".format(x)coroutine = do_some_work(2)loop = asyncio.get_event_loop()task = asyncio.ensure_future(coroutine)loop.run_until_complete(task)

五、并发 和 并行

并发:同一时刻同时发生并行:同一时间间隔发生

并发通常是指有多个任务需要同时进行,并行则是同一个时刻有多个任务执行.

当有多个任务需要并行时,可以将任务先放置在任务队列中,然后将任务队列传给 asynicio.wait 方法,这个方法会同时并行运行队列中的任务。将其注册到事件循环中。

import asyncioasync def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))

六、嵌套协程

使用 async 可以定义协程,协程用于耗时的 io 操作,我们也可以封装更多的 io 操作过程,这样就实现了嵌套的协程,即一个协程中 await 了另外一个协程,如此连接起来。

import asyncioasync def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]dones, pendings = await asyncio.wait(tasks)for task in dones:print("Task ret:", task.result())# results = await asyncio.gather(*tasks)# for result in results:#print("Task ret:",result)loop = asyncio.get_event_loop()loop.run_until_complete(main())

使用 asyncio.wait 的结果如下,可见返回的结果 dones 并不一定按照顺序输出

waiting: 1waiting: 2waiting: 4Task ret: Done after 2sTask ret: Done after 4sTask ret: Done after 1sTime: 4.006587505340576

使用 await asyncio.gather(*tasks) 得到的结果如下,是按照列表顺序进行返回的

waiting: 1waiting: 2waiting: 4Task ret: Done after 1sTask ret: Done after 2sTask ret: Done after 4sTime: 4.004234313964844

上面的程序将 main 也定义为协程。我们也可以不在 main 协程函数里处理结果,直接返回 await 的内容,那么最外层的 run_until_complete 将会返回main协程的结果。

import asyncioimport timenow = lambda: time.time()async def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]return await asyncio.gather(*tasks)# return await asyncio.wait(tasks)也可以使用。注意gather方法需要*这个标记start = now()loop = asyncio.get_event_loop()results = loop.run_until_complete(main())for result in results:print("Task ret:", result)print("Time:", now() - start)

也可以使用 as_complete 方法实现嵌套协程

import asyncioimport timenow = lambda: time.time()async def do_some_work(x):print("waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)async def main():coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(4)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]for task in asyncio.as_completed(tasks):result = await taskprint("Task ret: {}".format(result))start = now()loop = asyncio.get_event_loop()loop.run_until_complete(main())print("Time:", now() - start)

七、协程停止

创建 future 的时候,task 为 pending,事件循环调用执行的时候当然就是 running,调用完毕自然就是 done,如果需要停止事件循环,就需要先把 task 取消。可以使用 asyncio.Task 获取事件循环的 task。

future 对象有如下几个状态:Pending、Running、Done、Cacelled

import asyncioimport timenow = lambda: time.time()async def do_some_work(x):print("Waiting:", x)await asyncio.sleep(x)return "Done after {}s".format(x)coroutine1 = do_some_work(1)coroutine2 = do_some_work(2)coroutine3 = do_some_work(2)tasks = [asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3),]start = now()loop = asyncio.get_event_loop()try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt as e:print(asyncio.Task.all_tasks())for task in asyncio.Task.all_tasks():print(task.cancel())loop.stop()loop.run_forever()finally:loop.close()print("Time:", now() - start)

启动事件循环之后,马上 ctrl+c,会触发 run_until_complete 的执行异常 KeyBorardInterrupt。然后通过循环 asyncio.Task 取消 future。可以看到输出如下:

Waiting: 1Waiting: 2Waiting: 2^C{<Task finished coro=<do_some_work() done, defined at /app/py_code/study_asyncio/simple_ex10.py:13> result='Done after 1s'>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex10.py:15> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/lib/python3.5/asyncio/tasks.py:428]>, <Task pending coro=<wait() running at /usr/local/lib/python3.5/asyncio/tasks.py:361> wait_for=<Future pending cb=[Task._wakeup()]>>}FalseTrueTrueTrueTime: 1.0707225799560547

True 表示 cannel 成功,loop stop 之后还需要再次开启事件循环,最后在 close,不然还会抛出异常.

循环 task,逐个 cancel 是一种方案,可是正如上面我们把 task 的列表封装在 main 函数中,main 函数外进行事件循环的调用。这个时候,main 相当于最外出的一个task,那么处理包装的main 函数即可。

task取消和子协程调用原理

程序运行时 通过 ctl +c 取消任务 调用task.cancel()取消任务

import asyncioimport timeasync def get_html(sleep_times):print("waiting")await asyncio.sleep(sleep_times)print("done after {}s".format(sleep_times))if __name__ == "__main__":task1 = get_html(2)task2 = get_html(3)task3 = get_html(3)tasks = [task1, task2, task3]loop = asyncio.get_event_loop()try:loop.run_until_complete(asyncio.wait(tasks))except KeyboardInterrupt as e:all_tasks = asyncio.Task.all_tasks()for task in all_tasks:print("cancel task")print(task.cancel())loop.stop()loop.run_forever()finally:loop.close()

在终端执行:python ceshi.py ,运行成功后 按 ctl +c 取消任务

不同线程的事件循环( 线程、线程池 )

很多时候,我们的事件循环用于注册协程,而有的协程需要动态的添加到事件循环中。一个简单的方式就是使用多线程。当前线程创建一个事件循环,然后在新建一个线程,在新线程中启动事件循环。当前线程不会被 block。

import asynciofrom threading import Threadimport timenow = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = now()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,))t.start()print('TIME: {}'.format(time.time() - start))new_loop.call_soon_threadsafe(more_work, 6)new_loop.call_soon_threadsafe(more_work, 3)

启动上述代码之后,当前线程不会被 block,新线程中会按照顺序执行 call_soon_threadsafe 方法注册的 more_work 方法, 后者因为 time.sleep 操作是同步阻塞的,因此运行完毕more_work 需要大致 6 + 3

使用 线程池

# -*- coding:utf-8 -*-import asyncioimport timefrom concurrent.futures import ThreadPoolExecutorthread_pool = ThreadPoolExecutor(5)tasks = []func_now = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = func_now()new_loop = asyncio.new_event_loop()thread_pool.submit(start_loop, new_loop)print('TIME: {}'.format(time.time() - start))new_loop.call_soon_threadsafe(more_work, 6)new_loop.call_soon_threadsafe(more_work, 3)

主线程创建事件循环,子线程开启无限事件循环

import asyncioimport timefrom threading import Threadnow = lambda: time.time()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def do_some_work(x):print('Waiting {}'.format(x))await asyncio.sleep(x)print('Done after {}s'.format(x))def more_work(x):print('More work {}'.format(x))time.sleep(x)print('Finished more work {}'.format(x))start = now()new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,))t.start()print('TIME: {}'.format(time.time() - start))asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主线程中创建一个 new_loop,然后在另外的子线程中开启一个无限事件循环。 主线程通过run_coroutine_threadsafe新注册协程对象。这样就能在子线程中进行事件循环的并发操作,同时主线程又不会被 block。一共执行的时间大概在 6s 左右。

master - worker 主从模式

对于并发任务,通常是用生成消费模型,对队列的处理可以使用类似 master-worker 的方式,master 主要用户获取队列的 msg,worker 用户处理消息。

为了简单起见,并且协程更适合单线程的方式,我们的主线程用来监听队列,子线程用于处理队列。这里使用 redis 的队列。主线程中有一个是无限循环,用户消费队列。

import timeimport asynciofrom threading import Threadimport redisdef get_redis(): # 返回一个 redis 连接对象connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)def start_loop(loop): # 开启事件循环asyncio.set_event_loop(loop)loop.run_forever()async def worker(task):print('Start worker')while True:# start = now()# task = rcon.rpop("queue") # 从 redis 中 取出的数据# if not task:#await asyncio.sleep(1)#continueprint('Wait ', int(task)) # 取出了相应的任务await asyncio.sleep(int(task))print('Done ', task, now() - start)now = lambda: time.time()rcon = get_redis()start = now()# 创建一个事件循环new_loop = asyncio.new_event_loop()# 创建一个线程 在新的线程中开启事件循环t = Thread(target=start_loop, args=(new_loop,))t.setDaemon(True) # 设置线程为守护模式t.start() # 开启线程try:while True:task = rcon.rpop("queue") # 不断从队列中获取任务if not task:time.sleep(1)continue# 包装为 task ins, 传入子线程中的事件循环asyncio.run_coroutine_threadsafe(worker(task), new_loop)except Exception as e:print('error', e)new_loop.stop() # 出现异常 关闭时间循环finally:pass

给队列添加一些数据:

127.0.0.1:6379[3]> lpush queue 2(integer) 1127.0.0.1:6379[3]> lpush queue 5(integer) 1127.0.0.1:6379[3]> lpush queue 1(integer) 1127.0.0.1:6379[3]> lpush queue 1

可以看见输出:

Waiting 2Done 2Waiting 5Waiting 1Done 1Waiting 1Done 1Done 5

我们发起了一个耗时5s的操作,然后又发起了连个1s的操作,可以看见子线程并发的执行了这几个任务,其中5s awati的时候,相继执行了1s的两个任务。

改进:

import timeimport redisimport asynciofrom threading import Threadredis_queue_name = 'redis_list:test'def get_redis(): # 返回一个 redis 连接对象connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.StrictRedis(connection_pool=connection_pool)def add_data_to_redis_list(*args):redis_conn = get_redis()redis_conn.lpush(redis_queue_name, *args)def start_loop(loop=None): # 开启事件循环asyncio.set_event_loop(loop)loop.run_forever()async def worker(task=None):print('Start worker')while True:# task = redis_conn.rpop("queue") # 从 redis 中 取出的数据# if not task:#await asyncio.sleep(1)#continueprint('Wait ', int(task)) # 取出了相应的任务# 这里只是简单的睡眠传入的秒数await asyncio.sleep(int(task))def main():redis_conn = get_redis()# 创建一个事件循环new_loop = asyncio.new_event_loop()# 创建一个线程 在新的线程中开启事件循环t = Thread(target=start_loop, args=(new_loop,))t.setDaemon(True) # 设置线程为守护模式t.start() # 开启线程try:while True:task = redis_conn.rpop(name=redis_queue_name) # 不断从队列中获取任务if not task:time.sleep(1)continue# 包装为 task , 传入子线程中的事件循环asyncio.run_coroutine_threadsafe(worker(task), new_loop)except Exception as e:print('error', e)new_loop.stop() # 出现异常 关闭时间循环finally:new_loop.close()if __name__ == '__main__':# data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9]# add_data_to_redis_list(*data_list)main()pass

redis队列模型( 生产者 --- 消费者)

参考:/p/59621713

下面代码的主线程和双向队列的主线程有些不同,只是换了一种写法而已,代码如下

生产者代码:

import redisconn_pool = redis.ConnectionPool(host='127.0.0.1')redis_conn = redis.Redis(connection_pool=conn_pool)redis_conn.lpush('coro_test', '1')redis_conn.lpush('coro_test', '2')redis_conn.lpush('coro_test', '3')redis_conn.lpush('coro_test', '4')

消费者代码:

import asynciofrom threading import Threadimport redisdef get_redis():conn_pool = redis.ConnectionPool(host='127.0.0.1')return redis.Redis(connection_pool=conn_pool)def start_thread_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def thread_example(name):print('正在执行name:', name)await asyncio.sleep(2)return '返回结果:' + nameredis_conn = get_redis()new_loop = asyncio.new_event_loop()loop_thread = Thread(target=start_thread_loop, args=(new_loop,))loop_thread.setDaemon(True)loop_thread.start()# 循环接收redis消息并动态加入协程while True:msg = redis_conn.rpop('coro_test')if msg:asyncio.run_coroutine_threadsafe(thread_example('Zarten' + bytes.decode(msg, 'utf-8')), new_loop)

改进:

import asynciofrom threading import Threadimport redisdef get_redis():conn_pool = redis.ConnectionPool(host='127.0.0.1')return redis.Redis(connection_pool=conn_pool)def start_thread_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def thread_example(name):print('正在执行name:', name)await asyncio.sleep(2)return '返回结果:' + nameif __name__ == '__main__':redis_queue_name = 'redis_list:test'redis_conn = get_redis()for num in range(1, 10):redis_conn.lpush(redis_queue_name, num)new_loop = asyncio.new_event_loop()loop_thread = Thread(target=start_thread_loop, args=(new_loop,))loop_thread.setDaemon(True)loop_thread.start()# 循环接收redis消息并动态加入协程while True:msg = redis_conn.rpop(name=redis_queue_name)if msg:asyncio.run_coroutine_threadsafe(thread_example('King_' + msg.decode('utf-8')), new_loop)pass

停止子线程

如果一切正常,那么上面的例子很完美。可是,需要停止程序,直接ctrl+c,会抛出KeyboardInterrupt错误,我们修改一下主循环:

try:while True:task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)except KeyboardInterrupt as e:print(e)new_loop.stop()

可是实际上并不好使,虽然主线程 try 了 KeyboardInterrupt异常,但是子线程并没有退出,为了解决这个问题,可以设置子线程为守护线程,这样当主线程结束的时候,子线程也随机退出。

new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,))t.setDaemon(True) # 设置子线程为守护线程t.start()try:while True:# print('start rpop')task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)except KeyboardInterrupt as e:print(e)new_loop.stop()

线程停止程序的时候,主线程退出后,子线程也随机退出才了,并且停止了子线程的协程任务。

aiohttp

在消费队列的时候,我们使用 asyncio 的 sleep 用于模拟耗时的 io 操作。以前有一个短信服务,需要在协程中请求远程的短信 api,此时需要是需要使用 aiohttp 进行异步的 http 请求。大致代码如下:

server.py

import timefrom flask import Flask, requestapp = Flask(__name__)@app.route('/<int:x>')def index(x):time.sleep(x)return "{} It works".format(x)@app.route('/error')def error():time.sleep(3)return "error!"if __name__ == '__main__':app.run(debug=True)

/接口表示短信接口,/error表示请求/失败之后的报警。

async-custoimer.py

import timeimport asynciofrom threading import Threadimport redisimport aiohttpdef get_redis():connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)rcon = get_redis()def start_loop(loop):asyncio.set_event_loop(loop)loop.run_forever()async def fetch(url):async with aiohttp.ClientSession() as session:async with session.get(url) as resp:print(resp.status)return await resp.text()async def do_some_work(x):print('Waiting ', x)try:ret = await fetch(url='http://127.0.0.1:5000/{}'.format(x))print(ret)except Exception as e:try:print(await fetch(url='http://127.0.0.1:5000/error'))except Exception as e:print(e)else:print('Done {}'.format(x))new_loop = asyncio.new_event_loop()t = Thread(target=start_loop, args=(new_loop,))t.setDaemon(True)t.start()try:while True:task = rcon.rpop("queue")if not task:time.sleep(1)continueasyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)except Exception as e:print('error')new_loop.stop()finally:pass

有一个问题需要注意,我们在fetch的时候try了异常,如果没有try这个异常,即使发生了异常,子线程的事件循环也不会退出。主线程也不会退出,暂时没找到办法可以把子线程的异常raise传播到主线程。(如果谁找到了比较好的方式,希望可以带带我)。

对于 redis 的消费,还有一个 block 的方法:

try:while True:_, task = rcon.brpop("queue")asyncio.run_coroutine_threadsafe(do_some_work(int(task)), new_loop)except Exception as e:print('error', e)new_loop.stop()finally:pass

使用 brpop方法,会 block 住 task,如果主线程有消息,才会消费。测试了一下,似乎 brpop 的方式更适合这种队列消费的模型。

127.0.0.1:6379[3]> lpush queue 5(integer) 1127.0.0.1:6379[3]> lpush queue 1(integer) 1127.0.0.1:6379[3]> lpush queue 1

可以看到结果

Waiting 5Waiting 1Waiting 12001 It worksDone 12001 It worksDone 1 It worksDone 5

协程消费

主线程用于监听队列,然后子线程的做事件循环的worker是一种方式。还有一种方式实现这种类似master-worker的方案。即把监听队列的无限循环逻辑一道协程中。程序初始化就创建若干个协程,实现类似并行的效果。

import timeimport asyncioimport redisnow = lambda : time.time()def get_redis():connection_pool = redis.ConnectionPool(host='127.0.0.1', db=3)return redis.Redis(connection_pool=connection_pool)rcon = get_redis()async def worker():print('Start worker')while True:start = now()task = rcon.rpop("queue")if not task:await asyncio.sleep(1)continueprint('Wait ', int(task))await asyncio.sleep(int(task))print('Done ', task, now() - start)def main():asyncio.ensure_future(worker())asyncio.ensure_future(worker())loop = asyncio.get_event_loop()try:loop.run_forever()except KeyboardInterrupt as e:print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())loop.stop()loop.run_forever()finally:loop.close()if __name__ == '__main__':main()

这样做就可以多多启动几个worker来监听队列。一样可以到达效果。

总结

上述简单的介绍了asyncio的用法,主要是理解事件循环,协程和任务,future的关系。异步编程不同于常见的同步编程,设计程序的执行流的时候,需要特别的注意。毕竟这和以往的编码经验有点不一样。可是仔细想想,我们平时处事的时候,大脑会自然而然的实现异步协程。比如等待煮茶的时候,可以多写几行代码。

相关代码文件的Gist

参考:Threaded Asynchronous Magic and How to Wield It

示例:

参考:Python 中的并发处理之 asyncio 包使用的详解:/article/137681.htm

import asyncioimport itertoolsimport sysasync def spin(msg):for char in itertools.cycle('|/-\\'):status = char + ' ' + msgprint(status)try:# 使用 await asyncio.sleep(.1) 代替 time.sleep(.1),这样的休眠不会阻塞事件循环。await asyncio.sleep(.1)except asyncio.CancelledError:# 如果 spin 函数苏醒后抛出 asyncio.CancelledError 异常,其原因是发出了取消请求,因此退出循环。breakasync def slow_function():# 假装等待I/O一段时间# await asyncio.sleep(3) 表达式把控制权交给主循环,在休眠结束后恢复这个协程。await asyncio.sleep(3)return 42async def supervisor():# asyncio.ensure_future(...) 函数排定 spin 协程的运行时间,使用一个 Task 对象包装 spin 协程,并立即返回。spinner = asyncio.ensure_future(spin('thinking!'))print('spinner object:', spinner)t_result = await slow_function() # 驱动 slow_function() 函数。结束后,获取返回值。# 同时,事件循环继续运行,因为slow_function 函数最后使用 await asyncio.sleep(3) 表达式把控制权交回给了主循环。# Task 对象可以取消;取消后会在协程当前暂停的 yield 处抛出 asyncio.CancelledError 异常。# 协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消。spinner.cancel()return t_resultif __name__ == '__main__':loop = asyncio.get_event_loop() # 获取事件循环的引用# 驱动 supervisor 协程,让它运行完毕;这个协程的返回值是这次调用的返回值。result = loop.run_until_complete(supervisor())loop.close()print('Answer:', result)

二、避免阻塞型调用

1、有两种方法能避免阻塞型调用中止整个应用程序的进程:

在单独的线程中运行各个阻塞型操作。把每个阻塞型操作转换成非阻塞的异步调用。

使用多线程处理大量连接时将耗费过多的内存,故此通常使用回调来实现异步调用。

2、使用Executor对象防止阻塞事件循环:

使用 loop.run_in_executor 把阻塞的作业(例如保存文件)委托给线程池做。

示例:

import asyncioimport timeasync def shop(delay, what):print(what)await asyncio.sleep(delay)print(what,"...出来了")async def main():task1 = asyncio.create_task(shop(8, '女朋友看衣服..'))task2 = asyncio.create_task(shop(5, '体验手机..'))print(time.ctime(), "开始逛街")await task1await task2print(time.ctime(), "结束.")asyncio.run(main())

Python --- aiohttp 的使用

参考:/ssyfj/p/9222342.html

1. aiohttp 的简单使用(配合asyncio模块)

import asyncioimport aiohttpasync def fetch_async(url):print(url)async with aiohttp.request("GET", url) as r:# 或者直接 await r.read()不编码,直接读取,适合于图像等无法编码文件resp = await r.text(encoding="utf-8")print(resp)print('*' * 50)tasks = [fetch_async('/'),fetch_async('/')]if __name__ == '__main__':event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

2. 发起一个 session 请求

import asyncioimport aiohttpasync def fetch_async(url):print(url)async with aiohttp.ClientSession() as session: # 协程嵌套,只需要处理最外层协程即可fetch_asyncasync with session.get(url) as resp:print(resp.status)# 因为这里使用到了 await 关键字,实现异步,所有他上面的函数体需要声明为异步asyncprint(await resp.text())print('*' * 50)if __name__ == '__main__':tasks = [fetch_async('/'),fetch_async('/ssyfj/')]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

除了上面的 get 方法外,会话还支持 post,put,delete .... 等

session.put('/put', data=b'data')session.delete('/delete')session.head('/get')session.options('/get')session.patch('/patch', data=b'data')

不要为每次的连接都创建一次session,一般情况下只需要创建一个session,然后使用这个session执行所有的请求。

每个session对象,内部包含了一个连接池,并且将会保持连接和连接复用(默认开启)可以加快整体的性能。

3. 在 url 中传递参数(其实与 requests 模块使用大致相同)

只需要将参数字典,传入 params 参数中即可

import asyncioimport aiohttpfrom lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)# print(await r.read())ss = etree.HTML(text=await r.text())movie_name = ss.xpath('//ol[@class="grid_view"]//div[@class="hd"]//a//span[1]//text()')print(f'len: {len(list(map(lambda i=None: print(i), movie_name)))}')if __name__ == '__main__':# /top250?start=50tasks = [func1('/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

4. 获取响应内容(由于获取响应内容是一个阻塞耗时过程,所以我们使用await实现协程切换)

(1)使用 text() 方法

import asyncioimport aiohttpfrom lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(r.charset) # 查看默认编码为 utf-8print(await r.text()) # 不编码使用默认编码,也可以使用 encoding 指定编码if __name__ == '__main__':# /top250?start=50tasks = [func1('/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

(2)使用 read() 方法,不进行编码,为字节形式

import asyncioimport aiohttpfrom lxml import etreeasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(await r.read())if __name__ == '__main__':# /top250?start=50tasks = [func1('/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

(3)注意:text()、read() 方法是把整个响应体读入内存,如果你是获取大量的数据,请考虑使用 "字节流"(StreamResponse)

5. 特殊响应内容 json(和上面一样)

import asyncioimport aiohttpasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(r.url)print(r.charset)# print(await r.json()) # 可以设置编码,设置处理函数print(await r.read())if __name__ == '__main__':# /top250?start=100tasks = [func1('/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

6. 字节流形式获取数据(不像 text、read 一次获取所有数据)

注意:我们获取的 session.get() 是 Response 对象,他继承于 StreamResponse

import asyncioimport aiohttpasync def func1(url, params):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:print(await r.content.read(10)) # 读取前10字节if __name__ == '__main__':# /top250?start=100tasks = [func1('/top250', {"start": 100}), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

下面字节流形式读取数据,保存文件

import asyncioimport aiohttpasync def func1(url, params, filename):async with aiohttp.ClientSession() as session:async with session.get(url, params=params) as r:with open(filename, "wb") as fp:while True:chunk = await r.content.read(10)if not chunk:breakfp.write(chunk)if __name__ == '__main__':# /top250?start=100tasks = [func1('/top250', {"start": 100}, "1.html"), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

注意:

async with session.get(url,params=params) as r:# 异步上下文管理器with open(filename,"wb") as fp:# 普通上下文管理器

两者的区别:在于异步上下文管理器中定义了__aenter__和__aexit__方法

异步上下文管理器指的是在enterexit方法处能够暂停执行的上下文管理器

为了实现这样的功能,需要加入两个新的方法:__aenter____aexit__。这两个方法都要返回一个 awaitable类型的值。

推文:异步上下文管理器async with和异步迭代器async for

7. 自定义请求头(和 requests 一样)

import asyncioimport aiohttpasync def func1(url, params, filename):async with aiohttp.ClientSession() as session:headers = {'Content-Type': 'text/html; charset=utf-8'}async with session.get(url, params=params, headers=headers) as r:with open(filename, "wb") as fp:while True:chunk = await r.content.read(10)if not chunk:breakfp.write(chunk)if __name__ == '__main__':# /top250?start=100tasks = [func1('/top250', {"start": 100}, "1.html"), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

8. 自定义 cookie

注意:对于自定义cookie,我们需要设置在 ClientSession(cookies=自定义cookie字典),而不是session.get()中

class ClientSession:def __init__(self, *, connector=None, loop=None, cookies=None,headers=None, skip_auto_headers=None,auth=None, json_serialize=json.dumps,request_class=ClientRequest, response_class=ClientResponse,ws_response_class=ClientWebSocketResponse,version=http.HttpVersion11,cookie_jar=None, connector_owner=True, raise_for_status=False,read_timeout=sentinel, conn_timeout=None,timeout=sentinel,auto_decompress=True, trust_env=False,trace_configs=None):

使用:

cookies = {'cookies_are': 'working'}async with ClientSession(cookies=cookies) as session:

9. 获取当前访问网站的 cookie

async with session.get(url) as resp:print(resp.cookies)

10. 获取网站的响应状态码

async with session.get(url) as resp:print(resp.status)

11. 查看响应头

resp.headers # 来查看响应头,得到的值类型是一个dict:resp.raw_headers # 查看原生的响应头,字节类型

12. 查看重定向的响应头(我们此时已经到了新的网址,向之前的网址查看)

resp.history# 查看被重定向之前的响应头

13. 超时处理

默认的IO操作都有5分钟的响应时间 我们可以通过 timeout 进行重写:

async with session.get('', timeout=60) as r:...

如果 timeout=None 或者 timeout=0 将不进行超时检查,也就是不限时长。

14. ClientSession 用于在多个连接之间(同一网站)共享cookie,请求头等

import asyncioimport aiohttpasync def func1():cookies = {'my_cookie': "my_value"}async with aiohttp.ClientSession(cookies=cookies) as session:async with session.get("/q/1010000007987098") as r:print(session.cookie_jar.filter_cookies(""))async with session.get("/hottest") as rp:print(session.cookie_jar.filter_cookies(""))if __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

Set-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2qSet-Cookie: my_cookie=my_valueSet-Cookie: PHPSESSID=web2~d8grl63pegika2202s8184ct2qSet-Cookie: my_cookie=my_value

我们最好使用session.cookie_jar.filter_cookies()获取网站cookie,不同于requests模块,虽然我们可以使用rp.cookies有可能获取到cookie,但似乎并未获取到所有的cookies。

import asyncioimport aiohttpasync def func1():cookies = {'my_cookie': "my_value"}async with aiohttp.ClientSession(cookies=cookies) as session:async with session.get("/q/1010000007987098") as rp_1:print('1' * 50)print(session.cookie_jar.filter_cookies(""))# 首次访问会获取网站设置的 cookie# Set-Cookie: PHPSESSID=web2~jh3ouqoabvr4e72f87vtherkp6; Domain=; Path=/print('2' * 50)print(f'rp_1.cookies:{rp_1.cookies}')async with session.get("/hottest") as rp_2:print('3' * 50)print(session.cookie_jar.filter_cookies(""))print('4' * 50)print(f'rp_2.cookies:{rp_2.cookies}') # 为空,服务端未设置 cookieasync with session.get("/newest") as rp_3:print('5' * 50)print(session.cookie_jar.filter_cookies(""))print('6' * 50)print(f'rp_3.cookies:{rp_3.cookies}') # 为空,服务端未设置cookieif __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

运行结果:

11111111111111111111111111111111111111111111111111Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9Set-Cookie: my_cookie=my_value22222222222222222222222222222222222222222222222222rp_1.cookies:Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9; Domain=; Path=/33333333333333333333333333333333333333333333333333Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9Set-Cookie: my_cookie=my_value44444444444444444444444444444444444444444444444444rp_2.cookies:55555555555555555555555555555555555555555555555555Set-Cookie: PHPSESSID=k8s~5760e27c506e738ac8fb39ed309111f9Set-Cookie: my_cookie=my_value66666666666666666666666666666666666666666666666666rp_3.cookies:

总结:

当我们使用 rp.cookie 时,只会获取到当前 url 下设置的 cookie,不会维护整站的cookie而session.cookie_jar.filter_cookies("")会一直保留这个网站的所有设置cookies,含有我们在会话时设置的cookie,并且会根据响应修改更新cookie。这个才是我们需要的而我们设置cookie,也是需要在aiohttp.ClientSession(cookies=cookies)中设置

ClientSession 还支持 请求头,keep-alive连接和连接池(connection pooling)

15. cookie的安全性

默认 ClientSession 使用的是严格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受 url 和 ip 地址产生的 cookie,只能接受 DNS 解析 IP 产生的 cookie。可以通过设置 aiohttp.CookieJar 的 unsafe=True 来配置:

jar = aiohttp.CookieJar(unsafe=True)session = aiohttp.ClientSession(cookie_jar=jar)

16. 控制同时连接的数量(连接池)

TCPConnector 维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求

import asyncioimport aiohttpasync def func1():cookies = {'my_cookie': "my_value"}conn = aiohttp.TCPConnector(limit=2) # 默认100,0表示无限async with aiohttp.ClientSession(cookies=cookies, connector=conn) as session:for i in range(7, 35):url = "/list-%s-1.html" % iasync with session.get(url) as rp:print('---------------------------------')print(rp.status)if __name__ == '__main__':tasks = [func1(), ]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

限制同时打开连接到同一端点的数量( (host, port, is_ssl) 三的倍数),可以通过设置 limit_per_host 参数:

limit_per_host: 同一端点的最大连接数量。同一端点即(host, port, is_ssl)完全相同

conn = aiohttp.TCPConnector(limit_per_host=30)#默认是0

在协程下测试效果不明显

17. 自定义域名解析地址

我们可以指定域名服务器的 IP 对我们提供的get或post的url进行解析:

from aiohttp.resolver import AsyncResolverresolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])conn = aiohttp.TCPConnector(resolver=resolver)

18. 设置代理

aiohttp支持使用代理来访问网页:

async with aiohttp.ClientSession() as session:async with session.get("", proxy="") as resp:print(resp.status)

当然也支持需要授权的页面:

async with aiohttp.ClientSession() as session:proxy_auth = aiohttp.BasicAuth('user', 'pass') # 用户,密码async with session.get("", proxy="", proxy_auth=proxy_auth) as resp:print(resp.status)

或者通过这种方式来验证授权:

session.get("", proxy="http://user:pass@")

19. post传递数据的方法

(1)模拟表单

payload = {'key1': 'value1', 'key2': 'value2'}async with session.post('/post', data=payload) as resp:print(await resp.text())

注意:data=dict 的方式 post 的数据将被转码,和 form 提交数据是一样的作用,如果你不想被转码,可以直接以字符串的形式 data=str 提交,这样就不会被转码。

(2)post json

payload = {'some': 'data'}async with session.post(url, data=json.dumps(payload)) as resp:

其实json.dumps(payload)返回的也是一个字符串,只不过这个字符串可以被识别为json格式

(3)post 小文件

url = '/post'files = {'file': open('report.xls', 'rb')}await session.post(url, data=files)

url = '/post'data = FormData()data.add_field('file', open('report.xls', 'rb'), filename='report.xls', content_type='application/vnd.ms-excel')await session.post(url, data=data)

如果将文件对象设置为数据参数,aiohttp将自动以字节流的形式发送给服务器。

(4)post 大文件

aiohttp支持多种类型的文件以流媒体的形式上传,所以我们可以在文件未读入内存的情况下发送大文件。

@aiohttp.streamerdef file_sender(writer, file_name=None):with open(file_name, 'rb') as f:chunk = f.read(2 ** 16)while chunk:yield from writer.write(chunk)chunk = f.read(2 ** 16)# Then you can use `file_sender` as a data provider:async with session.post('/post', data=file_sender(file_name='huge_file')) as resp:print(await resp.text())

(5)从一个url获取文件后,直接post给另一个url

r = await session.get('')await session.post('/post',data=r.content)

(6)post 预压缩数据

在通过aiohttp发送前就已经压缩的数据, 调用压缩函数的函数名(通常是deflate 或 zlib)作为content-encoding的值:

import zlibasync def my_coroutine(session, headers, my_data):data = press(my_data)headers = {'Content-Encoding': 'deflate'}async with session.post('/post', data=data, headers=headers):pass

Python 协程爬虫 --- aiohttp + aiomultiprocess 使用

aiohttp 是基于 asyncio 的一个异步http客户端和服务器

官方文档:https://aiohttp.readthedocs.io/en/stable/client_quickstart.html

aiomultiprocess:/omnilib/aiomultiprocess

简单实用例子

async def funct(index): print("start ", index) async with aiohttp.ClientSession() as session: async with session.get("/top250?start=0", timeout=5) as resp: print(resp.status)print(await resp.text()) print("end ", index)

aiohttp.ClientSession()

创建会话,session 提供了各种请求方法,如 get、post、delete、put 等。认识新的关键字 async with,因为是协程的上下文管理,所以多了async关键字。这个不是强制使用的,你也可以自己手动关闭会话,但是一定要记得关闭

注意:

1、不要为每个请求创建会话。每个应用程序很可能需要一个会话来执行所有请求。2、aiohttp在发送请求之前在内部执行 URL规范化。要禁用规范化,请使用encoded=True参数进行URL构建

获取响应信息

resp.status # 状态码await resp.text() # 获取响应正文,可以指定编码await resp.read() # 读取二进制响应内容await resp.json() # 获取json响应内容await resp.content.read(size)# 读取流

注意事项:aiohttp 是在 await resp.text() 之后才发起请求的,所以必须调用之后才能获取响应的内容,不然会出现异常 aiohttp.client_exceptions.ClientConnectionError: Connection closed

aiomultiprocess

asyncio 和多处理本身是有用的,但有局限性:asyncio 仍然不能超过 GIL 的速度,并且多处理一次只能处理一个任务。但是,他们在一起可以充分实现自己的真正潜力。

aiomultiprocess 提供了一个简单的界面,同时在每个子进程上运行完整的 asyncio 事件循环,从而实现了 Python 应用程序从未有过的并发级别。每个子进程可以一次执行多个协程,仅受工作量和可用内核数限制。

注:aiomultiprocess 需要 Python 3.6 或更高版本

import asynciofrom aiohttp import requestfrom aiomultiprocess import Poolasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():urls = ["https://jreese.sh", ...]async with Pool() as pool:async for result in pool.map(get, urls):... # process resultif __name__ == '__main__':# Python 3.7asyncio.run(main())# Python 3.6# loop = asyncio.get_event_loop()# loop.run_until_complete(main())

用法:在子进程中执行协程

import asynciofrom aiohttp import requestfrom aiomultiprocess import Processasync def put(url, params):async with request("PUT", url, params=params) as response:passasync def main():p = Process(target=put, args=("https://jreese.sh", {}))await pif __name__ == "__main__":asyncio.run(main())

如果您想从协程中获取结果Worker,请使用以下方法:

import asynciofrom aiohttp import requestfrom aiomultiprocess import Workerasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():p = Worker(target=get, args=("https://jreese.sh",))response = await pif __name__ == "__main__":asyncio.run(main())

如果您需要一个托管的工作进程池,请使用Pool

import asynciofrom aiohttp import requestfrom aiomultiprocess import Poolasync def get(url):async with request("GET", url) as response:return await response.text("utf-8")async def main():urls = ["https://jreese.sh", ...]async with Pool() as pool:result = await pool.map(get, urls)if __name__ == "__main__":asyncio.run(main())

示例:

import timeimport jsonimport datetimeimport asyncioimport hashlibfrom aiomultiprocess import Poolfrom redis import *from pybloom_live import BloomFilterimport aiohttp## Public variable#Bloom_data = BloomFilter(1000000000, 0.01)DB_get_question = StrictRedis(host='62.234.9.254', port=6480, password='lingmiao', db=4)pipeline_redis = DB_get_question.pipeline()## Public functions#def md5(data):"""对数据进行MD5加密:param data::return:"""md5_qa = hashlib.md5(data.encode('utf8')).hexdigest()md5_qa = bytes(md5_qa, encoding='utf8')return md5_qaasync def get(data):"""协程函数:param url::return:"""# while True:# print('data:',data)# try:url = ''async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:get_proxy = DB_get_question.spop('IP_PROXY')response = await session.post(url, json=data, timeout=7, proxy={"http": "http://{}".format(get_proxy)})result = await response.text()hjson = json.loads(result)content = hjson['results'][0]['values']['text']# print('data:',data)print('\033[32;1mget_question\033[0m:', content)await asyncio.sleep(0.1)return content# except:#open('error_url.txt','a').write(url + '\n')#await get(data)async def request():"""使用进程加异步协程发送请求:return:"""key_number = 0datas = ['']split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '').replace('b', '')while len(datas) != 0:key_number += 1if len(datas) > 1:async with Pool() as pool:get_proxy = DB_get_question.spop('IP_PROXY')result_list = await pool.map(get, datas)# print(result_list)for result in result_list:if result:# print('key',key)# print('phone', phone)if '请求次数' in result or 'key不对' in result or '请求内容为空' in result:split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '')breakmd5_qa = md5(result)if md5_qa not in Bloom_data:Bloom_data.add(md5_qa)#pipeline_redis.lpush('total_question_list', result)pipeline_redis.sadd('get_question', result)pipeline_redis.execute()datas.clear()question_number = 0while True:question_number += 1pipeline_redis.spop('original_question_set')if question_number == 100:question_list = pipeline_redis.execute()breakdatas = {}print('datas', datas)print(datas)if key_number == 500:split_key = DB_get_question.spop('key2_set').decode('utf8').split(': ')key = split_key[-1].replace('\'', '').replace('}', '')phone = split_key[0].replace('\'', '').replace('{', '')key2_set_number = DB_get_question.scard('key2_set')if key2_set_number < 5:with open('key2_total.txt', 'r') as f_key:for key in f_key:key = key.strip()pipeline_redis.sadd('key2_set', key)pipeline_redis.execute()key_number = 0coroutine = request()task = asyncio.ensure_future(coroutine)loop = asyncio.get_event_loop()loop.run_until_complete(task)

基于 asyncio、aiohttp、xpath 的异步爬虫

参看:/weixin_34290390/article/details/88772610

# asyncio爬虫, 去重, 入库import asyncioimport reimport aiohttpimport aiomysqlfrom pyquery import PyQuerystopping = Falsestart_url = ''waiting_urls = []seen_urls = set() # 实际使用爬虫去重时,数量过多,需要使用布隆过滤器async def fetch(url, session):async with aiohttp.ClientSession() as session:try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:data = await resp.text()return dataexcept Exception as e:print(e)def extract_urls(html): # html中提取所有urlurls = []pq = PyQuery(html)for link in pq.items('a'):url = link.attr('href')if url and url.startwith('http') and url not in seen_urls:urls.append(url)waiting_urls.append(urls)return urlsasync def init_urls(url, session):html = await fetch(url, session)seen_urls.add(url)extract_urls(html)async def article_handler(url, session, pool): # 获取文章详情并解析入库html = await fetch(url, session)extract_urls(html)pq = PyQuery(html)title = pq('title').text() # 为了简单, 只获取title的内容async with pool.acquire() as conn:async with conn.cursor() as cur:await cur.execute('SELECT 42;')insert_sql = "insert into article_test(title) values('{}')".format(title)await cur.execute(insert_sql) # 插入数据库# print(cur.description)# (r,) = await cur.fetchone()# assert r == 42async def consumer(pool):async with aiohttp.ClientSession() as session:while not stopping:if len(waiting_urls) == 0: # 如果使用asyncio.Queue的话, 不需要我们来处理这些逻辑。await asyncio.sleep(0.5)continueurl = waiting_urls.pop()print('start get url:{}'.format(url))if re.match(r'http://.*?/\d+/', url):if url not in seen_urls: # 是没有处理过的url,则处理asyncio.ensure_future(article_handler(url, sssion, pool))else:if url not in seen_urls:asyncio.ensure_future(init_urls(url))async def main(loop):# 等待mysql连接建立好pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='',db='aiomysql_test', loop=loop, charset='utf8', autocommit=True)# charset autocommit必须设置, 这是坑, 不写数据库写入不了中文数据async with aiohttp.ClientSession() as session:html = await fetch(start_url, session)seen_urls.add(start_url)extract_urls(html)asyncio.ensure_future(consumer(pool))if __name__ == '__main__':event_loop = asyncio.get_event_loop()asyncio.ensure_future(main(event_loop))event_loop.run_forever()

改 进( 生产者 --- 消费者 从 redis 取数据)

config.py

import osimport syssys.path.append(os.getcwd())sys.path.append("..")sys.path.append(os.path.abspath("../../"))DEV_OR_PRD = 'dev'REDIS_CONFIG = Noneif 'dev' == DEV_OR_PRD.lower():REDIS_CONFIG = {'host': '127.0.0.1','port': 6379,'db': 0,'password': None}passelif 'prd' == DEV_OR_PRD.lower():REDIS_CONFIG = {'host': '127.0.0.1','port': 6379,}pass

produce.py

import redisimport jsonimport requestsfrom config import REDIS_CONFIGdef add_task():payload = {}headers = {'Host': '','csrf': 'E7D1IGX45D','Cookie': 'kw_token=E7D1IGX45D; kw_token=AKH1VOZ2767'}queue_name = 'redis_list:test'redis_conn = redis.StrictRedis(**REDIS_CONFIG)for _ in range(100):for page_num in range(1, 11):url = f"/api/www/bang/bang/musicList?bangId=16&pn={page_num}&rn=30"response = requests.request("GET", url, headers=headers, data=payload)try:music_list = response.json()['data']['musicList']except BaseException as e:music_list = []for item in music_list:task = {'crawl_url': item['pic'],'song_name': item['name']}redis_conn.lpush(queue_name, json.dumps(task, ensure_ascii=False))print(f'task:{task}')print(f'page {page_num} end')if __name__ == '__main__':add_task()pass

consumer.py

import reimport asyncioimport aiohttpimport redisimport jsonimport datetimefrom config import REDIS_CONFIGmax_worker = 50current_worker = 0def write_file(future=None):resp_data = future.result()if not resp_data:returntime_int = int(datetime.datetime.now().timestamp() * 1000)with open(f'./img/{time_int}.jpg', 'wb') as f:f.write(resp_data)async def fetch(url, session):global current_workercurrent_worker += 1try:async with session.get(url) as resp:print('url status: {}'.format(resp.status))if resp.status in [200, 201]:# data = await resp.text()data = await resp.read()current_worker -= 1return dataexcept Exception as e:print(e)return Noneasync def consumer(redis_conn=None, queue_name=None):async with aiohttp.ClientSession() as session:while True:task_string = redis_conn.rpop(queue_name)cha = current_worker - max_workerif cha >= 0 or not task_string:print('超过最大worker, 或者任务为空, 睡1秒继续。。。')await asyncio.sleep(1)continuetask_string = task_string.decode('utf-8')task_dict = json.loads(task_string)crawl_url = task_dict['crawl_url']asyncio_task = asyncio.ensure_future(fetch(crawl_url, session))asyncio_task.add_done_callback(write_file)# await asyncio.sleep(0.001)def main():queue_name = 'redis_list:test'redis_conn_pool = redis.ConnectionPool(**REDIS_CONFIG)redis_conn = redis.StrictRedis(connection_pool=redis_conn_pool)event_loop = asyncio.get_event_loop()try:event_loop.run_until_complete(consumer(redis_conn=redis_conn, queue_name=queue_name))except KeyboardInterrupt as e:event_loop.stop()event_loop.run_forever()finally:event_loop.close()redis_conn.close()if __name__ == '__main__':main()

2. asyncio + aiohttp

import aiohttpimport asyncioasync def fetch_async(url):print(url)async with aiohttp.request("GET", url) as r:response = await r.text(encoding="utf-8")# 或者直接await r.read()不编码,直接读取,适合于图像等无法编码文件# data = await r.read()# print(url, data)print(url, r.status)print(url, response)if __name__ == '__main__':tasks = [fetch_async('/'),fetch_async('/')]event_loop = asyncio.get_event_loop()results = event_loop.run_until_complete(asyncio.gather(*tasks))# event_loop.close()

示例:

参考:深入理解协程(四):async/await异步爬虫实战:/ghostlee/p/12208564.html

import asyncioimport timeimport aiohttpfrom lxml import etreeurls = ['/Jmilk/article/details/103218919','/stven_king/article/details/103256724','/csdnnews/article/details/103154693','/dg_lee/article/details/103951021','/m0_37907797/article/details/103272967','/zzq900503/article/details/49618605','/weixin_44339238/article/details/103977138','/dengjin4042056/article/details/103930275','/Mind_programmonkey/article/details/103940511','/xufive/article/details/102993570','/weixin_41010294/article/details/104009722','/yunqiinsight/article/details/103137022','/qq_44210563/article/details/102826406',]async def async_get_url(url):headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ''(KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36'}async with aiohttp.ClientSession() as session: # 解释1async with session.get(url, headers=headers) as r:html = await r.read()try:title = etree.HTML(html).xpath('//h1[@class="title-article"]/text()')[0]print(title)except IndexError:print(f'Fail URL: {r.url}')def async_main():loop = asyncio.get_event_loop()tasks = [async_get_url(url) for url in urls]loop.run_until_complete(asyncio.wait(tasks))# loop.close()if __name__ == '__main__':start = time.time()async_main()print(f'cost time: {time.time() - start}s')

运行结果:

Fail URL: /weixin_44339238/article/details/103977138网页实现一个简单的音乐播放器(大佬别看。(⊙﹏⊙))AES中ECB模式的加密与解密(Python3.7)【程序人生】程序员接私活常用平台汇总OOM别慌,手把手教你定位致 Python 初学者【图解算法面试】记一次面试:说说游戏中的敏感词过滤是如何实现的?8年经验面试官详解 Java 面试秘诀4G EPS 的网络协议栈【历史总结】Android-Universal-Image-Loader源码分析你不得不了解的卷积神经网络发展史java进阶(四)------java编程规范---代码质量检测工具FindBugs、PMD和CheckStyle的安装中国数据库OceanBase登顶之路cost time: 0.5409884452819824s

解释1:此处为异步的上下文管理器,是aiohttp官方文档提供的写法。如果对上下文管理器不是很了解的话,可以参看【吃透Python上下文管理器】。

用时:0.5409884452819824s。从两种爬虫的输出结果中可以看到:

文章标题的顺序不同。同步爬虫会按照urls内部的url顺序依次爬取文章标题。而异步爬虫爬取的顺序并不完全和urls中的url顺序相同。爬取速度差异很大。异步爬虫速度大概是普通同步爬虫的8~10倍。异步爬虫充分利用了网络请求这段时间。从而提高了爬取效率。

关于aiohttp的更多用法,推荐阅读

深入理解协程(一):协程的引入

深入理解协程(二):yield from实现异步协程

深入理解协程(三):async/await实现异步协程

Python进阶:上下文管理器

3. asyncio + requests

import asyncioimport requestsasync def fetch_async(func, *args, **kwargs):inner_loop = asyncio.get_event_loop()future = inner_loop.run_in_executor(None, func, *args)response = await futureprint(response.url, response.content)if __name__ == '__main__':tasks = [fetch_async(requests.get, '/wupeiqi/'),fetch_async(requests.get, '/pic/show?nid=4073644713430508&lid=10273091')]loop = asyncio.get_event_loop()results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()

示例:

import functools # at the top with the other importsimport asyncioimport requestsimport urllib3urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)async def fetch_async(lp, func, *args, **kwargs):# inner_loop = asyncio.get_event_loop()# future = inner_loop.run_in_executor(None, func, *args)print(f'*args: {args}')future = lp.run_in_executor(None, functools.partial(func, *args, **kwargs))response = await futureprint(response.url, response.content)if __name__ == '__main__':loop = asyncio.get_event_loop()tasks = [fetch_async(loop, requests.get, '/wupeiqi/', verify=False),fetch_async(loop, requests.get, '/pic/show?nid=4073644713430508&lid=10273091')]results = loop.run_until_complete(asyncio.gather(*tasks))loop.close()

示例:

task协程Future桥梁

import requestsimport asynciofrom concurrent import futures'''使用多线程:在协程中集成阻塞io原型 :awaitable loop.run_in_executor(executor, func, *args) 参数 : executor 可以是 ThreadPoolExecutor / ProcessPool , 如果是None 则使用默认线程池可使用 yield from 或 await 挂起函数作用 : 例如在异步事件循环中的写文件操作(或者其他IO操作), 这种慢的操作交给线程去做'''def get_url(r_url=None):r = requests.get(url=r_url, verify=False)print(r.json())if __name__ == "__main__":loop = asyncio.get_event_loop()executor = futures.ThreadPoolExecutor(3)tasks = []for index in range(20):url = f"/goods/{index}/"task = loop.run_in_executor(executor, get_url, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))

示例:

import asynciofrom concurrent import futuresdef block_func():with open("c:/test.txt", 'rb') as fd:return fd.read(500)async def todo(lp: asyncio.AbstractEventLoop):reader = await lp.run_in_executor(None, block_func) # 默认线程池print("reader:", reader)with futures.ThreadPoolExecutor() as ex:reader = await lp.run_in_executor(ex, block_func) # 自己创建一个线程池让事件循环调用print("reader :", reader)loop = asyncio.get_event_loop()loop.run_until_complete(todo(loop))

4. gevent + requests

from gevent import monkeymonkey.patch_all()import geventimport requestsimport urllib3urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)def fetch_async(method, url, **req_kwargs):print(method, url, req_kwargs)response = requests.request(method=method, url=url, **req_kwargs)# print(f'{response.url}, {response.content}')print(f'{response.url}, {response.status_code}')def test_1():# ##### 发送请求 #####gevent.joinall([gevent.spawn(fetch_async, method='get', url='/', verify=False),gevent.spawn(fetch_async, method='get', url='/', verify=False),gevent.spawn(fetch_async, method='get', url='/', verify=False),])def test_2():# #### 发送请求(协程池控制最大协程数量) #####from gevent.pool import Poolpool = Pool(None)gevent.joinall([pool.spawn(fetch_async, method='get', url='/', verify=False),pool.spawn(fetch_async, method='get', url='/', verify=False),pool.spawn(fetch_async, method='get', url='/', verify=False),])if __name__ == '__main__':# test_1()test_2()

5. grequests

import grequestsrequest_list = [grequests.get('/delay/1', timeout=0.001),grequests.get('http://fakedomain/'),grequests.get('/status/500')]# ##### 执行并获取响应列表 ###### response_list = grequests.map(request_list)# print(response_list)# ##### 执行并获取响应列表(处理异常) #####def exception_handler(request, exception):print(request, exception)print("Request failed")response_list = grequests.map(request_list, exception_handler=exception_handler)print(response_list)

6. Twisted 示例

twisted学习笔记No.3 Web Clients:/tracylining/p/3353808.html

from twisted.web.client import Agentfrom twisted.web.client import deferfrom twisted.internet import reactordef all_done(arg):reactor.stop()def callback(contents):print(contents)deferred_list = []url_list = ['', '', ]for url in url_list:agent = Agent(reactor)d = agent.request(b'GET', url.encode('utf-8'))# d.addCallbacks(printResource, printError)d.addCallback(callback)deferred_list.append(d)d_list = defer.DeferredList(deferred_list)d_list.addBoth(all_done)reactor.run()

7. Tornado

from tornado.httpclient import AsyncHTTPClientfrom tornado.httpclient import HTTPRequestfrom tornado import ioloopfrom tornado.httpclient import HTTPResponsedef handle_response(response: HTTPResponse):"""处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop():param response::return:"""if response.error:print("Error:", response.error)else:# print(response.body)print(f'{response.effective_url} : status_code : {response.code}')def func():url_list = ['','',]for url in url_list:print(url)http_client = AsyncHTTPClient()http_client.fetch(HTTPRequest(url), handle_response)ioloop.IOLoop.current().add_callback(func)ioloop.IOLoop.current().start()

8. Twisted 更多

from twisted.internet import reactorfrom twisted.web.client import getPagefrom urllib import parsedef one_done(arg: bytes):print(arg.decode('utf-8'))reactor.stop()post_data = parse.urlencode({'check_data': 'adf'})post_data = bytes(post_data, encoding='utf8')headers = {b'Content-Type': b'application/x-www-form-urlencoded'}response = getPage(bytes('/login', encoding='utf8'),method=bytes('POST', encoding='utf8'), postdata=post_data, cookies={}, headers=headers)response.addBoth(one_done)reactor.run()

以上均是 Python 内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】.

9.史上最牛逼的异步IO模块( select、poll、epoll)

select

import selectimport socketimport timeclass AsyncTimeoutException(TimeoutError):"""请求超时异常类"""def __init__(self, msg):self.msg = msgsuper(AsyncTimeoutException, self).__init__(msg)class HttpContext(object):"""封装请求和相应的基本数据"""def __init__(self, sock, host, port, method, url, data, callback, timeout=5):"""sock: 请求的客户端socket对象host: 请求的主机名port: 请求的端口port: 请求的端口method: 请求方式url: 请求的URLdata: 请求时请求体中的数据callback: 请求完成后的回调函数timeout: 请求的超时时间"""self.sock = sockself.callback = callbackself.host = hostself.port = portself.method = methodself.url = urlself.data = dataself.timeout = timeoutself.__start_time = time.time()self.__buffer = []def is_timeout(self):"""当前请求是否已经超时"""current_time = time.time()if (self.__start_time + self.timeout) < current_time:return Truedef fileno(self):"""请求sockect对象的文件描述符,用于select监听"""return self.sock.fileno()def write(self, data):"""在buffer中写入响应内容"""self.__buffer.append(data)def finish(self, exc=None):"""在buffer中写入响应内容完成,执行请求的回调函数"""if not exc:response = b''.join(self.__buffer)self.callback(self, response, exc)else:self.callback(self, None, exc)def send_request_data(self):content = """%s %s HTTP/1.0\r\nHost: %s\r\n\r\n%s""" % (self.method.upper(), self.url, self.host, self.data,)return content.encode(encoding='utf8')class AsyncRequest(object):def __init__(self):self.fds = []self.connections = []def add_request(self, host, port, method, url, data, callback, timeout):"""创建一个要请求"""client = socket.socket()client.setblocking(False)try:client.connect((host, port))except BlockingIOError as e:pass# print('已经向远程发送连接的请求')req = HttpContext(client, host, port, method, url, data, callback, timeout)self.connections.append(req)self.fds.append(req)def check_conn_timeout(self):"""检查所有的请求,是否有已经连接超时,如果有则终止"""timeout_list = []for context in self.connections:if context.is_timeout():timeout_list.append(context)for context in timeout_list:context.finish(AsyncTimeoutException('请求超时'))self.fds.remove(context)self.connections.remove(context)def running(self):"""事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""while True:r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)if not self.fds:returnfor context in r:sock = context.sockwhile True:try:data = sock.recv(8096)if not data:self.fds.remove(context)context.finish()breakelse:context.write(data)except BlockingIOError as e:breakexcept TimeoutError as e:self.fds.remove(context)self.connections.remove(context)context.finish(e)breakfor context in w:# 已经连接成功远程服务器,开始向远程发送请求数据if context in self.fds:data = context.send_request_data()context.sock.sendall(data)self.connections.remove(context)self.check_conn_timeout()if __name__ == '__main__':def callback_func(context, response, ex):""":param context: HttpContext对象,内部封装了请求相关信息:param response: 请求响应内容:param ex: 是否出现异常(如果有异常则值为异常对象;否则值为None):return:"""print(context, response, ex)obj = AsyncRequest()url_list = [{'host': '', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},{'host': '', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},{'host': '', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,'callback': callback_func},]for item in url_list:print(item)obj.add_request(**item)obj.running()

示例 2:

import socketimport selectclass HttpRequest:def __init__(self, sk, host, callback):self.socket = skself.host = hostself.callback = callbackdef fileno(self):return self.socket.fileno()class AsyncRequest:def __init__(self):self.conn = []self.connection = [] # 用于检测是否已经连接成功def add_request(self, host, callback):try:sk = socket.socket()sk.setblocking(0)sk.connect((host, 80,))except BlockingIOError as e:passrequest = HttpRequest(sk, host, callback)self.conn.append(request)self.connection.append(request)def run(self):while True:rlist, wlist, elist = select.select(self.conn, self.connection, self.conn, 0.05)for w in wlist:print(w.host, '连接成功...')# 只要能循环到,表示socket和服务器端已经连接成功tpl = "GET / HTTP/1.0\r\nHost:%s\r\n\r\n" % (w.host,)w.socket.send(bytes(tpl, encoding='utf-8'))self.connection.remove(w)for r in rlist:# r,是HttpRequestrecv_data = bytes()while True:try:chunck = r.socket.recv(8096)recv_data += chunckexcept Exception as e:breakr.callback(recv_data)r.socket.close()self.conn.remove(r)if len(self.conn) == 0:breakdef f1(data):print('保存到文件', data)def f2(data):print('保存到数据库', data)url_list = [{'host': '', 'callback': f1},{'host': '', 'callback': f2},{'host': '', 'callback': f2},]req = AsyncRequest()for item in url_list:req.add_request(item['host'], item['callback'])req.run()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。