当前位置: 首页 > article >正文

创建一个异步爬虫并将数据存入excel

事例网站链接: https://xk.scjgj.sh.gov.cn/xzxk_wbjg/#/tzdwSYDJList

一.数据获取流程

1️⃣对列表页请求获取有关详情页的字段值
2️⃣构造详情页的URL获取详情页数据
3️⃣将数据存入excel

二.异步代码

import asyncio
import logging
import pandas as pd
from aiohttp import ClientSession
from httpx._urlparse import quote
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
import ssl

# 设置日志
logging.basicConfig(level=logging.INFO)

# 请求头和数据定义
url_template = 'https://xk.scjgj.sh.gov.cn/xzxk_wbjg/query/public/sydjQueryDeviceEtInfo'
url_template2 = 'https://xk.scjgj.sh.gov.cn/xzxk_wbjg/query/public/useLicInfo/{}/{}'
headers = {
    'Accept': 'application/json, text/plain, */*',
    'Accept-Language': 'zh-CN,zh;q=0.9',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'Content-Type': 'application/json;charset=UTF-8',
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36',
}

# 初始化DataFrame
columns = ['applyId', '设备名称', '设备种类', '注册代码', '证书编号', '维保单位', '制造单位', '使用单位', '产品编号',
           '单位内编号', '使用单位地址', '发证日期', '登记机关', '设备类别']
df = pd.DataFrame(columns=columns)

# 创建一个新的工作簿用于保存数据
wb = Workbook()
ws = wb.active



# 定义一个锁来确保在保存Excel时没有其他操作
save_lock = asyncio.Lock()

# 并发请求的限制
concurrency_limit = 8  # 限制并发请求数量为32

# Semaphore用于控制并发数
semaphore = asyncio.Semaphore(concurrency_limit)

# 创建一个忽略SSL验证的上下文
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE


# 协程函数用于发送HTTP请求
async def fetch(session, url, data):
    async with semaphore:  # 控制并发数
        logging.warning(f"send URL: {url}")
        async with session.post(url, json=data, headers=headers, ssl=context) as response:
            content_type = response.headers.get('Content-Type', '')
            if 'application/json' in content_type:
                return await response.json()
            else:
                logging.warning(f"Unexpected Content-Type: {content_type} for URL: {url}")
                return None


# 协程函数用于处理一页数据
async def process_page(page):
    global df
    logging.info(f"start processing page {page}.")
    data = {"rows": 50, "zszl": "00206", "page": page}
    async with ClientSession() as session:
        res1 = await fetch(session, url_template, data)
        if res1 is None:
            logging.warning(f"Failed to fetch initial data for page {page}.")
            return

        resultList = res1['data']['resultList']
        tasks = []

        for item in resultList:
            encoded_lic_unique_id = quote(item['licUniqueId'])
            url2 = url_template2.format(item['applyId'], encoded_lic_unique_id)
            task = asyncio.create_task(fetch(session, url2, {}))
            tasks.append(task)

        results = await asyncio.gather(*tasks)
        for result, item in zip(results, resultList):
            if result is None:
                logging.warning(f"Failed to fetch secondary data for applyId {item['applyId']}.inpage:{page}")
                row = pd.DataFrame({
                    'applyId': [item['applyId']],
                    '设备名称': [item['devName']],
                    '设备种类': [item['devSuperclass']],
                    '注册代码': [item['deviceCode']],
                    '证书编号': [item['useLicNo']],
                    '维保单位': [item['maintainComName']],
                    '制造单位': [item['makeComName']],
                    '使用单位': [item['useComName']],
                    '产品编号': None,
                    '单位内编号': None,
                    '使用单位地址': None,
                    '发证日期': None,
                    '登记机关': None,
                    '设备类别': None
                })
                df = pd.concat([df, row], ignore_index=True)
                continue

            row = pd.DataFrame({
                'applyId': [item['applyId']],
                '设备名称': [item['devName']],
                '设备种类': [item['devSuperclass']],
                '注册代码': [item['deviceCode']],
                '证书编号': [item['useLicNo']],
                '维保单位': [item['maintainComName']],
                '制造单位': [item['makeComName']],
                '使用单位': [item['useComName']],
                '产品编号': [result['data'].get('productCode')],
                '单位内编号': [result['data'].get('innerCode')],
                '使用单位地址': [result['data'].get('usePlace')],
                '发证日期': [result['data'].get('qfsj')],
                '登记机关': [result['data'].get('fzjgmc')],
                '设备类别': [result['data'].get('devSubclass')]
            })
            df = pd.concat([df, row], ignore_index=True)
        logging.info(f"Finished processing page {page}.")
        


# 异步保存数据函数
async def save_data(df):
    global ws
    async with save_lock:
        try:
            logging.info("Saving data.")

            # 等待当前所有任务完成
            pending_tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()]
            if pending_tasks:
                logging.info(f"Waiting for {len(pending_tasks)} tasks to complete before saving.")
                await asyncio.gather(*pending_tasks)

            # 先保存数据
            for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=ws.max_row == 1), start=1):
                for c_idx, value in enumerate(row, start=1):
                    ws.cell(row=r_idx, column=c_idx, value=value)

            # 保存Excel文件
            wb.save('./yb.xlsx')

            # 清空DataFrame
            df.drop(df.index, inplace=True)

            logging.info("Data saved.")
        except Exception as e:
            logging.error(f"Failed to save data: {e}")

# 异步主函数
async def main():
    global df, last_saved_page
    logging.warning("start")
    pages = list(range(8000, 9000))
    tasks = [process_page(page) for page in pages]
    logging.warning("listtaskend")
    await asyncio.gather(*tasks)

    # 检查是否有剩余数据需要保存
    if not df.empty:
        await save_data(df)

    # 确保所有任务都已完成
    pending_tasks = [task for task in asyncio.all_tasks() if task is not asyncio.current_task()]
    if pending_tasks:
        logging.info(f"Waiting for {len(pending_tasks)} final tasks to complete.")
        await asyncio.gather(*pending_tasks)

    # 释放锁
    save_lock.release()


# 运行异步主函数
if __name__ == '__main__':
    asyncio.run(main())

http://www.kler.cn/news/360036.html

相关文章:

  • redis—cluster集群
  • 在C++中,使用基于range的for循环迭代range
  • Meta因称其AI模型Llama为“开源” 遭炮轰,被指“污染” 开源术语
  • Nature 正刊丨年轻的小行星家族是陨石的主要来源
  • [DICOM活久见-2]认识DICOM的多帧图像,并且用pydicom拆分为单帧图像
  • C++学习路线(十九)
  • ReactNative项目根据平台去判断允许用户是热更新还是强更新或者若更新
  • docker基础使用创建固定硬盘大小为40G的虚拟机
  • qt继承结构
  • yolo自动化项目实例解析(八)自建UI-键鼠录制回放
  • linux主机定时发送邮件(s-nail)
  • 不常用的css合集
  • 从网络请求到Excel:自动化数据抓取和保存的完整指南
  • 【设计模式七大设计原则】
  • 网络相关(HTTP/TCP/UDP/IP)
  • 【VUE小型网站开发】优化通用配置
  • Python爬虫:获取去哪儿网目的地下的景点数据
  • Java 解决阿里云OSS服务器私有权限图片通过URL无法预览的问题
  • 【Linux】实验:mkdir 命令 、 tee 命令
  • 保研推荐信模板