300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > Python实现多线程 多进程爬虫

Python实现多线程 多进程爬虫

时间:2019-03-24 09:56:05

相关推荐

Python实现多线程 多进程爬虫

1.多线程的线程守护

在python3中,主线程主进程结束,子进程不会结束,为了能够让主线程回收子线程,可以把子线程设置为守护线程,即该线程不重要,主线程结束,子线程结束:

举个例子:

import timeimport threadingdef test():while True:print('测试线程守护!!',threading.currentThread())time.sleep(1)if __name__ == '__main__':t1 = threading.Thread(target=test)t2 = threading.Thread(target=test)t1.start()t2.start()

执行以上代码,得出的结果是交替打印出两个线程,由于线程执行的函数为死循环,所以子线程也会陷入死循环,不管你怎么按ctrl+c也无法停止线程。如下:

(venv) E:\test>python test.py测试线程守护!! <Thread(Thread-1, started 9652)>测试线程守护!! <Thread(Thread-2, started 2728)>测试线程守护!! <Thread(Thread-1, started 9652)>测试线程守护!! <Thread(Thread-2, started 2728)>测试线程守护!! <Thread(Thread-1, started 9652)>测试线程守护!! <Thread(Thread-2, started 2728)>测试线程守护!! <Thread(Thread-1, started 9652)>测试线程守护!! <Thread(Thread-2, started 2728)>测试线程守护!! <Thread(Thread-1, started 9652)>测试线程守护!! <Thread(Thread-2, started 2728)>测试线程守护!! <Thread(Thread-1, started 9652)>测试线程守护!! <Thread(Thread-2, started 2728)>测试线程守护!! <Thread(Thread-1, started 9652)>测试线程守护!! <Thread(Thread-2, started 2728)>

那么怎样才能可以避免这种问题呢?或者说,怎样才能在主线程退出的时候,子线程也自动退出呢?没错就是使用线程守护,也就是等到主线程执行完,子线程也强制停止,换句话说就是子线程会随着主线程一起结束。使用案例:

t1 = threading.Thread(targe=func,args=(,))t1.setDaemon(True)t1.start() #此时线程才会启动

优化前面死循环不能停止的线程,源码如下:

import timeimport threadingdef test():while True:print('测试线程守护!!',threading.currentThread())time.sleep(1)if __name__ == '__main__':t1 = threading.Thread(target=test)t2 = threading.Thread(target=test)t1.setDaemon(True)t1.start()t2.setDaemon(True)t2.start()

输出结果:

(venv) E:\test>python test.py测试线程守护!! <Thread(Thread-1, started daemon 5884)>测试线程守护!! <Thread(Thread-2, started daemon 7112)>

可以看到,运行一次就直接退出了,因为主线程已经执行完了,确实是主线程已经结束了,正因为设置了守护线程,所以这时候子线程也一并退出了。

2.队列模块的使用

from queue import Queueq = Queue(maxsize=100)item = {}q.put_nowait(item) #不等待直接放,队列满的时候会报错q.put(item) #放入数据,队列满的时候会等待q.get_nowait() #不等待直接取,队列空的时候会报错q.get() #取出数据,队列为空的时候会等待q.qsize() #获取队列中现存数据的个数 q.join() #队列中维持了一个计数,计数不为0时候让主线程阻塞等待,队列计数为0的时候才会继续往后执行q.task_done() # put的时候计数+1,get不会-1,get需要和task_done 一起使用才会-1

3. 线程中使用队列

## 队列可用于线程间的数据通讯from queue import Queueimport threadingq = Queue()def add_to_queue():for i in range(0, 100):print("存入队列: {}".format(i))q.put(i)def get_from_queue():# 但是在我们获取队列元素的时候, 我们并不知道队列中放了几个元素,# 这个时候我们就会使用while的死循环来获取,知道取完为止# for i in range(0, 100):while True:print("从队列中取出: {}".format(q.get()))q.task_done()# 创建线程t = threading.Thread(target=add_to_queue)# 设置为守护线程t.setDaemon(True)# 启动线程t.start()t = threading.Thread(target=get_from_queue)t.setDaemon(True)t.start()# 队列加入主线线程, 等待队列中任务完成为止q.join()

4. 装饰器的基本使用

装饰器的作用: 不改变原有函数,给函数添加额外功能

# 定义装饰一个死循环执行任务的装饰器def run_forever(func):def forever(obj):while True:func(obj)return forever# 使用装饰器@run_forever # 等同于 run_forever(run)def run(obj):print("人生苦短,我用Python {}".format(obj))# 执行函数 --死循环run('Hpayy')

糗事百科多线程

实现步骤:
1.在init方法中, 创建 URL队列, 响应队列, 数据队列2.在生成URL列表中方法中,把URL添加URL队列中3.在请求页面的方法中,从URL队列中取出URL执行,把获取到的响应数据添加响应队列中4.在处理数据的方法中,从响应队列中取出页面内容进行解析, 把解析结果存储数据队列中5.在保存数据的方法中, 从数据队列中取出数据,进行保存6.开启几个线程来执行上面的方法
注意: 开的线程数取决于这个任务耗时, 耗时长的就多开几个线程.

多线程爬虫源码:

import threadingimport requestsfrom lxml import etreeimport jsonfrom queue import Queue# 多线程思路:# 把所有待采集得链接url放进一个url队列里面# 在请求页面得过程中,以多线程得方式从队列中取出url,请求结束后要执行task_done()方法来释放队列避免重复采集# 为了更快得处理相应后数据(页面数据),也可以把每个页面获得得页面源码放在相应队列中,一样处理完得task_done()来清队列# 最后得保存数据也可以开一个队列用于保存# 可以设置线程数量来处理数据# 由于请求、处理数据需要较多次数的轮循,所以可以直接写个修饰器函数用于循环某个函数def run_forever(func):def wrapper(obj):while True:func(obj)return wrapperclass CrawlData:def __init__(self):self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36'}self.url_pattern = '/8hr/page/{}/'self.url_queue = Queue() # url 队列self.page_queue= Queue() # 页面 队列self.data_queue = Queue() # 数据 队列# 把待采集的url放入队列def add_url_to_queue(self):for i in range(1, 14):self.url_queue.put(self.url_pattern.format(i))@run_foreverdef add_page_to_queue(self):url = self.url_queue.get() # 从队列中获取一个url,get不会做-1操作result = requests.get(url, headers=self.headers)if result.status_code != 200:self.url_queue.put(url)else:self.page_queue.put(result.text)#结束当前url任务self.url_queue.task_done()@run_foreverdef add_data_to_queue(self):# 从队列中获取一个页面数据出来待处理page = self.page_queue.get()element = etree.HTML(page)div_s = element.xpath('//div[@class="recommend-article"]/ul/li')# 遍历分组列表, 再使用xpath获取内容dz_list = []for div in div_s:item = {}# 每个段子包含发送人头像URL, 昵称, 性别, 段子内容, 好笑数,评论数# 头像URLitem['head_url'] = self.get_first_element(div.xpath('.//a[@class="recmd-user"]/img/@src'))if item['head_url'] is not None:item['head_url'] = 'http:' + item['head_url']# 昵称item['author_name'] = self.get_first_element(div.xpath('.//a[@class="recmd-user"]/span/text()'))dz_list.append(item)# 解析好得数据放入队列self.data_queue.put(dz_list)# 结束当前页面数据任务self.page_queue.task_done()def get_first_element(self, list):'''获取列表中第一个元素,如果是空列表就返回None'''return list[0] if len(list) != 0 else None# 保存数据@run_foreverdef save_data(self):# 从已有的数据中获取一个处理后的数据进行存储data = self.data_queue.get()with open('qiubai.json', 'a', encoding='utf8') as f:for dt in data:json.dump(dt, f, ensure_ascii=False)f.write('\n')self.data_queue.task_done() # 结束当前数据任务def run_count_use(self, func, count):# 开启多个线程处理for i in range(count):t = threading.Thread(target=func)t.setDaemon(True)t.start()def run(self):# 把待处理的url加入队列url_t = threading.Thread(target=self.add_url_to_queue)url_t.start()# 开启获取页面数据线程self.run_count_use(self.add_page_to_queue, 3)# 开启处理解析页面数据 self.run_count_use(self.add_data_to_queue, 2)# 开启数据存储线程self.run_count_use(self.save_data, 2)# 使用队列join方法,等待队列数据处理结束self.url_queue.join() # 等待url队列为空时才会往下继续self.page_queue.join() # 等待页面队列为空时才会往下继续self.data_queue.join() # 等待数据队列为空时才会往下继续if __name__ == '__main__':import timestart = time.time()CrawlData().run()end = time.time()print('总共用时:{}'.format((end-start)))

多进程爬取糗百

实现步骤

开启一个进程池Pool,一般是基于cpu个数直接遍历循环待采集的url,并使用apply_async开启异步进程按找需求解析源码,获取想要采集的数据

源码:

from multiprocessing import Poolimport jsonimport requestsimport osimport timefrom lxml import etreeclass CrawlData:def __init__(self, url):self.headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36'}self.url = urldef get_page_data(self):print('当前子进程{}'.format(os.getpid()))page_data = requests.get(self.url, headers=self.headers)element = etree.HTML(page_data.text)div_s = element.xpath('//div[@class="recommend-article"]/ul/li')# 遍历分组列表, 再使用xpath获取内容dz_list = []for div in div_s:item = {}item['head_url'] = self.get_first_element(div.xpath('.//a[@class="recmd-user"]/img/@src'))if item['head_url'] is not None:item['head_url'] = 'http:' + item['head_url']# 昵称item['author_name'] = self.get_first_element(div.xpath('.//a[@class="recmd-user"]/span/text()'))dz_list.append(item)self.save_data(dz_list)def get_first_element(self, list):'''获取列表中第一个元素,如果是空列表就返回None'''return list[0] if len(list) != 0 else Nonedef save_data(self, data):# 从已有的数据中获取一个处理后的数据进行存储with open('qiubai_process.json', 'a', encoding='utf8') as f:for dt in data:json.dump(dt, f, ensure_ascii=False)f.write('\n')# 采集入口函数def crawl(url):CrawlData(url).get_page_data()if __name__ == '__main__':start = time.time()url_pattern = '/8hr/page/{}/'p = Pool(4) # 不传参默认以最大cpu数作为进程数for i in range(1,14):url = url_pattern.format(str(i))p.apply_async(crawl, args=(url,))p.close()p.join()end = time.time()print(f'总共用时:{(end-start)}')

运行结果:

单进程 Pool(1):

当前子进程10356当前子进程10356当前子进程10356当前子进程10356当前子进程10356当前子进程10356当前子进程10356当前子进程10356当前子进程10356当前子进程10356当前子进程10356当前子进程10356当前子进程10356总共用时:13.325877666473389

双进程 Pool(2):

当前子进程3320当前子进程9492当前子进程3320当前子进程9492当前子进程9492当前子进程3320当前子进程9492当前子进程3320当前子进程3320当前子进程9492当前子进程3320当前子进程9492当前子进程3320总共用时:1.3394155502319336

三进程 Pool(3):

当前子进程10908当前子进程9124当前子进程3148当前子进程10908当前子进程9124当前子进程3148当前子进程3148当前子进程9124当前子进程10908当前子进程9124当前子进程3148当前子进程10908当前子进程3148总共用时:1.1449363231658936

四进程 Pool(4):

当前子进程11208当前子进程7944当前子进程9188当前子进程9916当前子进程9916当前子进程11208当前子进程9188当前子进程7944当前子进程9916当前子进程7944当前子进程11208当前子进程9188当前子进程11208总共用时:0.9803769588470459

下面再来个爬取知乎的例子

采集知乎一定要登录才可以,否则无法正常获取数据,本文采用cookies携带登录信息请求数据,具体之前文章《Python爬虫之利用cookies跳过登陆验证码》已经介绍过。其中cookies一定要用等登陆后从浏览器里面获取,并用于请求中,不然会报以下错误:

{"code":"100010","'message'":"Unsupported auth type oauth"}

多进程采集知乎

from multiprocessing import Poolimport jsonimport osimport requestsimport timefrom queue import Queueclass CrawlZH(object):def __init__(self, url, cookies):self.url = urlself.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36'}self.cookies = cookiesdef get_json(self):session = requests.session()session.headers = self.headersrequests.utils.add_dict_to_cookiejar(session.cookies, self.cookies)response = session.get(self.url)result = json.loads(response.text)print("当前子进程-{}运行结果:{}".format(os.getpid(), len(result['data'])))def main(url, cookies):# print('当前子进程:{}'.format(os.getpid()))CrawlZH(url, cookies).get_json()if __name__ == '__main__':# 这里cookies是你正常的登陆后从浏览器里面复制出来cookies = {...}url_pattern = '/api/v3/feed/topstory/recommend?desktop=true&page_number={}&limit=6'# 开启多进程采集 start === print('母进程:{}'.format(os.getpid()))start = time.time()p = Pool()for page_number in range(1, 11):url = url_pattern.format(str(page_number))p.apply_async(main, args=(url, cookies,))p.close()p.join()end = time.time()print(f'总共耗时:{(end-start)}')# 多进程 end ==== # 多线程采集 start ===start = time.time()CrawlZHThread(cookies).run()end = time.time()print(f'总共耗时:{(end-start)}')# 多线程 end ====

结果:

多进程结果:母进程:5024当前子进程-6372运行结果:6当前子进程-12984运行结果:6当前子进程-4536运行结果:6当前子进程-12924运行结果:6当前子进程-12984运行结果:6当前子进程-6372运行结果:6当前子进程-12924运行结果:6当前子进程-4536运行结果:6当前子进程-12984运行结果:6当前子进程-6372运行结果:6总共耗时:2.6678614616394043

多线程采集知乎

# 简单 修饰器 用于无限循环def run_forever(func):def wrapper(obj):while True:func(obj)return wrapperclass CrawlZHThread(object):def __init__(self, cookies):self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36'}self.cookies = cookiesself.url_pattern = '/api/v3/feed/topstory/recommend?desktop=true&page_number={}&limit=6'self.url_queue = Queue()def add_url_to_queue(self):for i in range(1, 11):self.url_queue.put(self.url_pattern.format(str(i)))def get_json(self):while True:url = self.url_queue.get()session = requests.session()session.headers = self.headersrequests.utils.add_dict_to_cookiejar(session.cookies, self.cookies)response = session.get(url)result = json.loads(response.text)print("当前子线程-{}运行结果:{}".format(threading.current_thread().name, len(result['data'])))self.url_queue.task_done()@run_foreverdef get_json_01(self):url = self.url_queue.get()session = requests.session()session.headers = self.headersrequests.utils.add_dict_to_cookiejar(session.cookies, self.cookies)response = session.get(url)result = json.loads(response.text)print("当前子线程-{}运行结果:{}".format(threading.current_thread().name, len(result['data'])))self.url_queue.task_done()def run(self):url_t = threading.Thread(target=self.add_url_to_queue)url_t.start()# 开启多线程for i in range(4):t = threading.Thread(target=self.get_json)t.setDaemon(True)t.start()self.url_queue.join()if __name__ == '__main__':# 这里cookies是你正常的登陆后从浏览器里面复制出来cookies = {...}# 多线程采集 start ===start = time.time()CrawlZHThread(cookies).run()end = time.time()print(f'总共耗时:{(end-start)}')# 多线程 end ====

结果:

多线程结果:当前子线程-Thread-3运行结果:6当前子线程-Thread-4运行结果:6当前子线程-Thread-5运行结果:6当前子线程-Thread-2运行结果:6当前子线程-Thread-3运行结果:6当前子线程-Thread-4运行结果:6当前子线程-Thread-5运行结果:6当前子线程-Thread-2运行结果:6当前子线程-Thread-3运行结果:6当前子线程-Thread-4运行结果:6总共耗时:2.4344863891601562

其中get_json与get_json_01结果是一样的,只不过一个用了修饰器,用修饰器可以提高可读性

对比以上两种采集方式,时间差不多,我只做获取页面数据,没有做后续数据处理,所以没有显著差异!!

注意本文的创作时间,后期页面源码调整,可能将采集不到数据,也属正常现象,本文主要目的是提供多进程、多线程爬虫的思想逻辑,仅供参考!!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。