定时爬取全国肺炎疫情信息并制作web接口

Madge ·
更新时间:2024-11-13
· 587 次阅读

定时爬取全国肺炎疫情信息并制作web接口

使用腾讯接口:https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5
数据库:MySQL
库:request
语言:python 3.7
因为边爬边存的过程中出现错误,所以换思路,先爬下来存储为json格式,然后解析json,再进行数据的处理。

#coding=gbk import requests import json import flask import time import pymysql import re from flask import request #获取参数 cities = [] # 腾讯数据接口获取json格式疫情数据 def get_ncp_data(): url = 'https://view.inews.qq.com/g2/getOnsInfo?name=disease_h5' requests.adapters.DEFAULT_RETRIES = 5 data = requests.get(url,timeout = 500).json()['data'] return data # 扁平化中国疫情数据 def flatten_ncp_data(): all = json.loads(get_ncp_data()) # 初始化结果链表 # cities = [] # 数据最新更新时间 date = all['lastUpdateTime'] # 第一层:国家 china = all['areaTree'][0]['children'] # 获得中国数据 # 第二层:省 for province in china: province_ncp = province['children'] # 第三层:市 for city in province_ncp: # 输出格式 city_ncp = { '日期': date, '省份': province['name'], '市': city['name'], '新增确认': city['today']['confirm'], # '新增治愈': city['today']['heal'], # '新增死亡': city['today']['dead'], '累计确认': city['total']['confirm'], '累计治愈': city['total']['heal'], '累计死亡': city['total']['dead'] } cities.append(city_ncp) # print(cities) return cities #存储为json def saver(): number = flatten_ncp_data() file_name = 'E:\竞赛\大三下学期\中国大学生计算机设计大赛(2)\需提交\源码\spider\information.json' #通过扩展名指定文件存储的数据为json格式 with open(file_name,'w') as file_object: json.dump(number,file_object) # 读json(转为数组) def reader(): file_name = 'E:\竞赛\大三下学期\中国大学生计算机设计大赛(2)\需提交\源码\spider\information.json' with open(file_name,'r') as file_object: contents = json.load(file_object) # print(contents) return contents # 创建表 def clear_table(con,table_name): sql = "show tables;" #第一次使用需要将数据表删除 con.execute(sql) tables = [con.fetchall()] table_list = re.findall('(\'.*?\')',str(tables)) table_list = [re.sub("'",'',each) for each in table_list] if table_name in table_list: return 1 else: return 0 # 向表中添加数据 def add_table(): connect = pymysql.connect( user = 'root', password = 'root', db = 'MYSQL', host = '127.0.0.1', port = 3306, charset = 'utf8' ) con = connect.cursor() con.execute("use p_information") # 使用数据库 table_name = 'information' # 表名 if(clear_table(con,table_name) != 1): sql = '''create table information(city varchar(11),province varchar(11), new_confirm varchar(11),total_confirm varchar(13),heal varchar(11),dead varchar(11))''' con.execute(sql) print(len(reader())) for i in range(len(reader())): city = reader()[i]['市'] province = reader()[i]['省份'] new_confirm = reader()[i]['新增确认'] total_confirm = reader()[i]['累计确认'] heal = reader()[i]['累计治愈'] dead = reader()[i]['累计死亡'] print("******************") print(city) print(province) print(new_confirm) print(total_confirm) print(heal) print(dead) con.execute('insert into information(city,province,new_confirm,total_confirm,heal,dead)values(%s,%s,%s,%s,%s,%s)', [city,province,new_confirm,total_confirm,heal,dead]) print("finish!!!!!!!!!!!!!!!!!!!!!!!") connect.commit() con.close() connect.close() # 数据库查询 def conn_mysql(sql): import MySQLdb conn = MySQLdb.connect(user='root', password='root', database='p_information',charset='utf8') #创建游标 指定查询字段名 cur = conn.cursor(cursorclass = MySQLdb.cursors.DictCursor) #这里 cur.execute(sql) res = cur.fetchone() #print(res) conn.commit() cur.close() conn.close() return res server = flask.Flask(__name__) #创建一个flask对象 @server.route('/login', methods=['get','post']) def query(): sql = 'select * from information' data = conn_mysql(sql) return '{"msg":"成功"}' server.run(port=8000,debug=True) # 每n秒执行一次 def timer(n): while True: saver() add_table() query() time.sleep(n) # main if __name__ == "__main__": # # 10min timer(10*60)

在这里插入图片描述
在这里插入图片描述


作者:super_xxt



Web 肺炎 疫情

需要 登录 后方可回复, 如果你还没有账号请 注册新账号
相关文章