300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > 爬虫使用mysql创建新数据库并进行增删改查操作

爬虫使用mysql创建新数据库并进行增删改查操作

时间:2024-04-14 00:19:12

相关推荐

爬虫使用mysql创建新数据库并进行增删改查操作

前言:

本文参考:Python3使用mysql创建新数据库并进行增删改查操作/Anwel/article/details/79919646

该文章描述了爬虫爬取数据后存入数据库自动建库建表,有不足之处还望大家提出

import reimport timefrom functools import reducefrom io import BytesIOimport pymysqlimport requestsfrom PIL import Imageimport randomfrom bs4 import BeautifulSoupfrom lxml import etreefrom selenium import webdriverfrom selenium.webdriver import ActionChainsfrom mon.by import Byfrom mon.keys import Keysfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom mon.desired_capabilities import DesiredCapabilitiesimport uuidfrom fake_useragent import agentsclass CrackTouClick():def __init__(self):self.url = '/V2PRTS/AmendBulletinInfoListInit.do'self.browser = webdriver.Chrome()# self.browser.maximize_window()self.wait = WebDriverWait(self.browser, 20)# self.username = USERNAME# self.password = PASSWORD# self.chaojiying = Chaojiying(CHAOJIYING_USERNAME, CHAOJIYING_PASSWORD, CHAOJIYING_SOFT_ID)self.session = requests.Session()def __del__(self):self.browser.close()def open(self):"""打开首页,获取网页源码:return: None"""self.browser.get(self.url)# print(self.browser.page_source)# 获取当前页数start_time = self.wait.until(EC.presence_of_element_located((By.XPATH, '//*[@id="datagrid-row-r1-1-0"]/td[2]/div')))print(start_time.text)if start_time.text:# cookie失效,重新获取cookie_list = self.browser.get_cookies()# print(cookie_list)# cookie_dict = {cookie["name"]:cookie["value"] for cookie in cookie_list}cookie = [item["name"] + "=" + item["value"] for item in cookie_list]# print(cookie)cookiestr = '; '.join(item for item in cookie)# print(self.browser.page_source)html = self.browser.page_sourcereturn html, cookiestrdef resolve_html(self, html, cookiestr):"""解析网页:param html::return: 数据字典"""html_data = etree.HTML(html)# 获取二个table的数据table_data_list = html_data.xpath('//table[@class="datagrid-btable"]/tbody/tr')#获取列表的长度data_len = len(table_data_list)# print(data_len)# 定义一个列表存储data_dict_list = []for i in range(0, data_len//2):# 定义一个字典data_dict = {}# print(i)table_1 = table_data_list[i]# 获取变更公告名称amendBulletinName = table_1.xpath('./td[2]/div/a/text()')[0]# 获取变更公告名称的链接amendBulletinName_links = table_1.xpath('./td[2]/div/a/@onclick')[0]# print(tenderPrjName_links)str = re.findall(r"\?(.*)", amendBulletinName_links)[0].split("'")[0]amendBulletinName_link = self.url + "?" + str# 获取变更类型try:amendType = table_1.xpath('./td[3]/div/a/span/text() | ./td[3]/div/span/text() | ./td[3]/div/text()')[0]except Exception as e:print(e)amendType = ""table_2 = table_data_list[data_len//2+i]# 获取变更公告发布时间amendBulletinIssueTime = table_2.xpath('./td[1]/div/text()')[0]# 获取原公告编号try:originalBulletinCode = table_2.xpath('./td[2]/div/text()')[0]except Exception as e:print(e)originalBulletinCode = ""# print("111")# 请求链接的htmlamendBulletinName_link_html = self.resolve_tenderPrjName_link(amendBulletinName_link, cookiestr)# print(type(tenderPrjName_link_html))data_dict['amendBulletinName'] = amendBulletinNamedata_dict['amendBulletinName_link'] = amendBulletinName_linkdata_dict['amendType'] = amendTypedata_dict['amendBulletinIssueTime'] = amendBulletinIssueTimedata_dict['originalBulletinCode'] = originalBulletinCodedata_dict['amendBulletinName_link_html'] = amendBulletinName_link_html# print(data_dict)data_dict_list.append(data_dict)# print(data_dict_list)return data_dict_listdef resolve_tenderPrjName_link(self, tenderPrjName_link, cookiestr):"""点击获取链接内容:return:"""try:user_agent = random.choice(agents)# print(tenderPrjName_link)headers = {'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8','Accept-Language': 'zh-CN,zh;q=0.9','Connection': 'keep-alive','Cookie': cookiestr,'Host': '','Referer': '/V2PRTS/AmendBulletinInfoListInit.do',# 'Referer': '/V2PRTS/WinningPublicityInfoListInit.do','User-Agent': user_agent,}res = self.session.get(tenderPrjName_link, headers=headers)res.encoding = 'utf-8'# todo 需要判断是否获取到登陆后的页面# print(res.text)if res.status_code == 200:dict_data = res.text# print(dict_data)soup = BeautifulSoup(dict_data, "lxml")contents = soup.find('div', class_="trading_publicly_fr fr")# print(contents)return contentselse:# cookie失效,重新获取cookie_list = self.browser.get_cookies()# print(cookie_list)# cookie_dict = {cookie["name"]:cookie["value"] for cookie in cookie_list}cookie = [item["name"] + "=" + item["value"] for item in cookie_list]# print(cookie)cookiestrs = '; '.join(item for item in cookie)# print(cookiestr)return self.resolve_tenderPrjName_link(tenderPrjName_link, cookiestrs)except Exception as e:print("获取链接失败:{}".format(e))def next_page(self, db, cursor, cookiestr, table_name, page_n):"""获取下一页:return:"""page_all = self.browser.find_element_by_xpath('//div[@class="datagrid-pager pagination"]/table/tbody/tr/td[8]/span').textpage = re.findall('共(\d+)页', page_all)[0]print(page_all)for i in range(page_n, int(page) + 1):print('正在爬取第{}页数据'.format(i))self.browser.find_element_by_xpath('//div[@class="datagrid-pager pagination"]/table/tbody/tr/td[7]/input').clear()self.browser.find_element_by_xpath('//div[@class="datagrid-pager pagination"]/table/tbody/tr/td[7]/input').send_keys(i, Keys.ENTER)time.sleep(0.5)pages = self.browser.find_element_by_xpath("//div[@class='pagination-info']").text# print(pages)page_1 = re.findall(r"(\d+)", pages)[0]page_2 = re.findall(r"(\d+)", pages)[1]pa = int(page_2) - int(page_1) + 1# print(pa)# print(type(pa))page_4 = str((i - 1) * 10 + pa)print(page_4)print(type(page_4))if page_2 == page_4:# WebDriverWait(self.browser, 30).until(#EC.text_to_be_present_in_element((By.XPATH, '//*[@id="datagrid-row-r1-1-9"]/td[1]/div'), str((i-1) * 10 + pa)))try:html = self.browser.page_sourcedata_dict_list = self.resolve_html(html, cookiestr)# 插入数据self.insertData(db, cursor, data_dict_list, table_name)except Exception as e:print('出现异常,请调试代码,err:{}'.format(e))def ceartTable(self, cursor, data_dict_list, table_name):"""创建表:param cursor::param data_dict_list::return:"""try:# 创建students 数据库, 如果存在则删除数据库# cursor.execute("drop database if exists wuhan_zhaobiao")# cursor.execute("create database wuhan_zhaobiao")# 选择 students 这个数据库# cursor.execute("use wuhan_zhaobiao")key_list = []key_len = len(key_list)for key in data_dict_list[0].keys():# print(key)sql_1 = "{0} text".format(key)key_list.append(sql_1)t = reduce(lambda x, y: str(x) + "," + str(y), key_list)# print(t)# sql = """CREATE TABLE EMPLOYEE ({0})""".format(t)# print(sql)# sql = """CREATE TABLE IF NOT EXISTS Students (# ID CHAR(10) NOT NULL,# Name CHAR(8),# Grade INT )""# sql中的内容为创建一个表sql = """CREATE TABLE {0} (id INT NOT NULL AUTO_INCREMENT,primary key(id),{1}) DEFAULT CHARSET=utf8""".format(table_name, t)# # 如果存在student这个表则删除# cursor.execute("drop table if exists student")# 创建表cursor.execute(sql)print("successfully create table")except Exception as e:print("数据库建表失败,err:{}".format(e))def insertData(self, db, cursor, data_dict_list, table_name):"""插入数据到数据库:return:"""# 插入数据for data_dict in data_dict_list:# print(data_dict)key_list = []value_list = []for key, value in data_dict.items():# print(key)key_list.append(key)value_list.append(str(value).strip())# print(key_list)t_keys = reduce(lambda x, y: str(x) + "," + str(y), key_list)# t_values = reduce(lambda x, y: x + "," + y, value_list)# print(values_list)# print(len(values_list))sql = """INSERT INTO {0} ({1}) VALUES{2};""".format(table_name, t_keys, tuple(value_list))try:# 执行sql语句cursor.execute(sql)# 提交到数据库执行mit()print("successfully insert data")except Exception as e:print("插入数据失败,err:{}".format(e))# 发生错误时回滚db.rollback()def table_exists(self, cursor, table_name):"""这个函数用来判断表是否存在:param table_name::return:"""sql = "show tables;"cursor.execute(sql)tables = [cursor.fetchall()]table_list = re.findall('(\'.*?\')', str(tables))table_list = [re.sub("'", '', each) for each in table_list]if table_name in table_list:# 存在返回1return 1else:# 不存在返回0return 0def crack(self):"""入口:return:"""# 链接mysql数据库db = pymysql.connect("localhost", "root", "mysql", "xxx", charset="utf8")# 创建指针cursor = db.cursor()# 获取到HTML页面html, cookiestr = self.open()# 对HTML页面进行解析,获取数据data_dict_list = self.resolve_html(html, cookiestr)# print(data_dict_list)table_name = str(input("请输入表的名字:"))# 判断数据库里是否存在该表table_n = self.table_exists(cursor, table_name)# 不存在就创建表if table_n != 1:# 创建数据表self.ceartTable(cursor, data_dict_list, table_name)# # 插入数据self.insertData(db, cursor, data_dict_list, table_name)# 进行翻页操作self.next_page(db, cursor, cookiestr, table_name, 2)else:# 可能程序中断,重新开始进行翻页操作page_n = int(input("请输入要开始的页数:"))self.next_page(db, cursor, cookiestr, table_name, page_n)if __name__ == '__main__':crack = CrackTouClick()crack.crack()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。