当前位置: 首页 > article >正文

农产品价格报告爬虫使用说明

农产品价格报告爬虫使用说明

# **************************************************************************
# *                                                                          *
# *                      农产品价格报告爬虫                                   *
# *                                                                          *
# *                        作者: xiaohai                                     *
# *                        版本: v1.0.0                                      *
# *                        日期: 2024-12-05                                  *
# *                                                                          *
# *        功能说明:                                                         *
# *            1. 日度报告                                                   *
# *               - 生成今日分析报告                                         *
# *               - 生成指定日期报告                                         *
# *               - 包含价格指数、分品类分析等                                *
# *                                                                          *
# *            2. 周度报告                                                   *
# *               - 生成本周分析报告                                         *
# *               - 生成指定周报告                                           *
# *               - 汇总周内价格变化                                         *
# *                                                                          *
# *            3. 价格走势                                                   *
# *               - 农产品价格200指数走势                                    *
# *               - 猪肉价格全国走势                                         *
# *               - 猪肉价格区域走势                                         *
# *               - 粮油价格指数走势                                         *
# *                                                                          *
# *            4. 数据导出                                                   *
# *               - 支持Excel格式导出                                        *
# *               - 包含多个数据分类                                         *
# *               - 支持时间范围选择                                         *
# *                                                                          *
# *        : 农业农村部市场信息中心                                   *
# *        版权声明: 仅用于学习交流                                          *
# *                                                                          *
# **************************************************************************

import os
import json
import logging
import requests
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import pandas as pd
import warnings
import urllib3
import sys
import subprocess
import pkg_resources
from bs4 import BeautifulSoup
import re
import time

# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings('ignore')

# 配置常量
VERSION = 'v1.0.0'
AUTHOR = 'xiaohai'
DATA_SOURCE = '农业农村部市场信息中心'

# API配置
API_BASE_URL = 'https://ncpscxx.moa.gov.cn'
API_ENDPOINTS = {
    'price_index': '/product/common-price-index/getIndexList',
    'variety_list': '/product/sys-variety/selectList',
    'price_trend': '/product/price-info/getPriceInfoList',
    'market_list': '/product/sys-market/selectList',
    'daily_price': '/product/price-info/getDailyPrice',
    'analysis_report': '/product/analysis-report/pageList'
}

# 输出目录配置
OUTPUT_DIRS = {
    'base': 'reports',
    'daily': 'reports/daily',
    'weekly': 'reports/weekly'
}

# 图表样式配置
CHART_STYLE = {
    'figure': {
        'figsize': (12, 6),
        'facecolor': '#f8fcfa'
    },
    'grid': {
        'linestyle': '--',
        'alpha': 0.3,
        'color': 'gray'
    },
    'line': {
        'marker': 'o',
        'markersize': 4,
        'linewidth': 2
    },
    'colors': {
        'blue': '#40a9ff',
        'green': '#73d13d',
        'orange': '#ffa940',
        'red': '#ff4d4f',
        'purple': '#9254de',
        'cyan': '#36cfc9'
    }
}

def check_and_install_packages():
    """检查并安装所需的包"""
    required_packages = {
        'requests': 'requests',      # HTTP请求
        'pandas': 'pandas',          # 数据处理
        'matplotlib': 'matplotlib',  # 绘图支持
        'urllib3': 'urllib3',        # HTTP客户端
        'openpyxl': 'openpyxl',     # Excel支持
        'colorama': 'colorama'       # 控制台颜色
    }
    
    print("\n" + "="*50)
    print("检查并安装依赖包...")
    print("="*50)
    
    try:
        import colorama
        colorama.init()
        success_mark = colorama.Fore.GREEN + "✓" + colorama.Style.RESET_ALL
        error_mark = colorama.Fore.RED + "✗" + colorama.Style.RESET_ALL
    except ImportError:
        success_mark = "✓"
        error_mark = "✗"
    
    all_success = True
    for package, import_name in required_packages.items():
        try:
            pkg_resources.require(package)
            print(f"{success_mark} {package:15} 已安装")
        except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict):
            print(f"{error_mark} {package:15} 未安装,正在安装...")
            try:
                subprocess.check_call([
                    sys.executable, 
                    "-m", 
                    "pip", 
                    "install", 
                    "--disable-pip-version-check",
                    "--no-cache-dir",
                    package
                ], stdout=subprocess.DEVNULL)
                print(f"{success_mark} {package:15} 安装成功")
            except Exception as e:
                print(f"{error_mark} {package:15} 安装失败: {str(e)}")
                all_success = False
    
    print("\n依赖包检查" + ("全部完成" if all_success else "存在问题"))
    print("="*50 + "\n")
    
    if not all_success:
        print("某些依赖包安装失败,程序能无法正常运行!")
        if input("是否继续运行?(y/n): ").lower() != 'y':
            sys.exit(1)

class ReportCrawler:
    """农产品价格报告爬虫"""
    
    def __init__(self):
        # 禁用SSL警告
        warnings.filterwarnings('ignore')
        
        # 基础配置
        self._setup_directories()
        self._setup_logger()
        self._setup_api()
        
    def _setup_directories(self):
        """创建输出目录"""
        self.output_dir = "reports"
        self.daily_dir = os.path.join(self.output_dir, "daily")
        self.weekly_dir = os.path.join(self.output_dir, "weekly")
        
        for d in [self.output_dir, self.daily_dir, self.weekly_dir]:
            if not os.path.exists(d):
                os.makedirs(d)
                
    def _setup_logger(self):
        """配置日志"""
        log_file = os.path.join("logs", f"crawler_{datetime.now().strftime('%Y%m%d')}.log")
        os.makedirs("logs", exist_ok=True)
        
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        
        # 文件处理器
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setFormatter(formatter)
        
        # 制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        
        # 配置日志器
        self.logger = logging.getLogger(__name__)
        self.logger.setLevel(logging.INFO)
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
        
    def _setup_api(self):
        """配置API"""
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
            'Origin': 'https://ncpscxx.moa.gov.cn',
            'Referer': 'https://ncpscxx.moa.gov.cn/',
            'Accept': 'application/json, text/plain, */*',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Content-Type': 'application/json;charset=UTF-8'
        }

    def show_menu(self):
        """显示功能菜单"""
        menu = """
农产品价格报告爬虫系统
====================
1. 成今日分析报告
2. 生成本周分报告
3. 生成指定日期报告
4. 生成指定周报告
5. 生成价格指数走势图
6. 生成猪肉价格走势图
7. 生成区域价格走势图
8. 生成粮油价格走势图
9. 导出Excel数据
0. 退出系统

请输入���能编号(0-9): """
        print("\n" + "="*50)  # 添加分隔线
        choice = input(menu)
        print("="*50 + "\n")  # 添加分隔线
        return choice

    def run(self):
        """运行系统"""
        while True:
            choice = self.show_menu()
            
            if choice == "0":
                print("感谢使用,再见!")
                break
                
            elif choice == "1":
                print("正在生成今日分析报告...")
                self.generate_daily_report(datetime.now())
                
            elif choice == "2":
                print("正在生成本周分析报告...")
                today = datetime.now()
                self.generate_weekly_report(today.year, int(today.strftime("%W")))
                
            elif choice == "3":
                date_str = input("请输入日期(格式:YYYY-MM-DD): ")
                try:
                    date = datetime.strptime(date_str, "%Y-%m-%d")
                    self.generate_daily_report(date)
                except:
                    print("日期格式错误!")
                    
            elif choice == "4":
                year = int(input("请输入年份: "))
                week = int(input("请输入周数(1-52): "))
                self.generate_weekly_report(year, week)
                
            elif choice == "5":
                days = int(input("请输入要查看的天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.plot_index_trend(start, end)
                
            elif choice == "6":
                days = int(input("请输入要查看的天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.plot_pig_price_trend(start, end)
                
            elif choice == "7":
                days = int(input("请输入要查看的天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.plot_pig_price_region_trend(start, end)
                
            elif choice == "8":
                days = int(input("请输入要查看的天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.plot_grain_price_trend(start, end)
                
            elif choice == "9":
                days = int(input("请输入要导天数: "))
                end = datetime.now()
                start = end - timedelta(days=days)
                self.export_data(start, end)
                
            else:
                print("无效的选择,请重试!")
            
            input("\n按回车键继续...")

    def _make_request(self, url, method='get', params=None, data=None):
        """发送HTTP请求
        
        Args:
            url: 请求URL
            method: 请求方法,支持 'get'/'post'
            params: URL参数
            data: POST数据
            
        Returns:
            Response对象或None(请求失败)
        """
        try:
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
            }
            
            if method.lower() == 'get':
                response = requests.get(
                    url,
                    params=params,
                    headers=headers,
                    verify=False,
                    timeout=10
                )
            else:
                response = requests.post(
                    url,
                    params=params,
                    json=data,  # 添加json参数支持
                    headers=headers,
                    verify=False,
                    timeout=10
                )
            
            response.raise_for_status()
            return response
            
        except requests.exceptions.RequestException as e:
            self.logger.error(f"请求失败: {str(e)}")
            return None

    def fetch_daily_report(self, date):
        """获取日度价格报告"""
        try:
            url = f"{API_BASE_URL}/api/FarmDaily/list"
            data = {
                "daylyDate": date.strftime("%Y-%m-%d")
            }
            
            response = self._make_request(url, method='post', data=data)
            if not response:
                return None
            
            data = response.json()
            if data.get("code") == 200 and data.get("content",{}).get("list"):
                # 找到指定日期的报告
                target_date = date.strftime("%Y-%m-%d")
                for report in data["content"]["list"]:
                    if report["daylyDate"].startswith(target_date):
                        # 提取所需数据
                        return {
                            "conclusion": report["counclesion"],
                            "indexConclusion": report["indexConclusion"],
                            "animalConclusion": report["animalConclusion"],
                            "aquaticConclusion": report["aquaticConclusion"],
                            "vegetablesConclusion": report["vegetablesConclusion"],
                            "fruitsConclusion": report["fruitsConclusion"],
                            "content": report["countent"],
                            "incOrReduRange": report["incOrReduRange"]
                        }
                        
                self.logger.warning(f"未找到{target_date}的报告")
                return None
            
            self.logger.warning(f"获取数据失败: {data.get('message', '未知错误')}")
            return None
            
        except Exception as e:
            self.logger.error(f"获取日度报告出错: {str(e)}")
            return None

    def _extract_conclusions(self, report):
        """从报告中提取各类结论"""
        try:
            return {
                "index": report.get("indexConclusion", ""),
                "animal": report.get("animalConclusion", ""),
                "aquatic": report.get("aquaticConclusion", ""),
                "vegetables": report.get("vegetablesConclusion", ""),
                "fruits": report.get("fruitsConclusion", ""),
                "range": report.get("incOrReduRange", "")
            }
        except Exception as e:
            self.logger.error(f"提取论出错: {str(e)}")
            return {}

    def fetch_index_data(self, start_date, end_date):
        """获取价格指数数据"""
        try:
            url = "https://pfsc.agri.cn/price_portal/pi-info-day/getPortalPiInfoDay"
            response = requests.post(url, headers=self.headers, verify=False)
            data = response.json()
            
            if data["code"] == 200:
                result = []
                for item in data["content"]:
                    pub_date = datetime.strptime(item["publishDate"], "%Y-%m-%d")
                    if start_date <= pub_date <= end_date:
                        result.append({
                            "日期": item["publishDate"],
                            "农产品批发价格200指数": item["agriculture"],
                            "粮油指数": item["grainAndOil"],
                            "篮子数": item["vegetableBasket"]
                        })
                return result
            return None
            
        except Exception as e:
            self.logger.error(f"获取指数数据失败: {str(e)}")
            return None

    def fetch_pig_price_data(self, start_date, end_date):
        """获取猪肉价格数据"""
        try:
            url = f"{API_BASE_URL}{API_ENDPOINTS['variety_list']}"
            params = {'pid': 'MH'}  # 猪肉品类ID
            
            response = self._make_request(url, method='post', params=params)
            if not response:
                return None
            
            data = response.json()
            if data.get("code") == 200 and data.get("data"):
                # 转换数据格式
                result = []
                for item in data["data"]:
                    if start_date <= datetime.strptime(item["date"], "%Y-%m-%d") <= end_date:
                        result.append({
                            "日期": item["date"],
                            "全国": float(item["national"]),
                            "东北": float(item["northEast"]),
                            "华北": float(item["northChina"]),
                            "华东": float(item["eastChina"]),
                            "华中": float(item["centralChina"]),
                            "华南": float(item["southChina"]),
                            "西南": float(item["southWest"])
                        })
                return result
            
            self.logger.warning(f"获取数据失败: {data.get('message', '未知错误')}")
            return None
            
        except Exception as e:
            self.logger.error(f"获取猪肉价格数据失败: {str(e)}")
            return None

    def fetch_grain_price_data(self, start_date, end_date):
        """获取粮油价格数据"""
        try:
            url = f"{API_BASE_URL}{API_ENDPOINTS['variety_list']}"
            params = {'pid': 'TL'}  # 粮油品类ID
            
            response = self._make_request(url, method='post', params=params)
            if not response:
                return None
            
            data = response.json()
            if data.get("code") == 200 and data.get("data"):
                # 转换数据格式
                result = []
                for item in data["data"]:
                    if start_date <= datetime.strptime(item["date"], "%Y-%m-%d") <= end_date:
                        result.append({
                            "日期": item["date"],
                            "通义粮价指数": float(item["grainPriceIndex"]),
                            "通义粮市指数": float(item["grainMarketIndex"]),
                            "通义粮市第1号": float(item["grainMarketNo1"]),
                            "通义粮天指数": float(item["grainDayIndex"]),
                            "通义���指": float(item["grainIndex"]),
                            "通义粮天指数(干粮)": float(item["grainDayDryIndex"])
                        })
                return result
            
            self.logger.warning(f"获取数据失败: {data.get('message', '未知错误')}")
            return None
            
        except Exception as e:
            self.logger.error(f"获取粮油价格数据失败: {str(e)}")
            return None

    def generate_daily_report(self, date):
        """生成每日分析报告"""
        try:
            report_data = self.fetch_daily_report(date)
            if not report_data:
                self.logger.warning(f"未获取到 {date.strftime('%Y-%m-%d')} 的报告数据")
                return
            
            report_file = os.path.join(
                self.daily_dir,
                f"{date.strftime('%Y年%m月%d日')}_价格分析报告.md"
            )
            
            # 使用更清晰模板格式
            content = f"""# {date.strftime('%Y年%m月%d日')} 农产品价格分析报告

## 一、价格指数变化
{report_data["indexConclusion"]}

## 二、分品类分析

### 1. 畜禽产品
{report_data["animalConclusion"]}

### 2. 水产品
{report_data["aquaticConclusion"]}

### 3. 蔬菜
{report_data["vegetablesConclusion"]}

### 4. 水果
{report_data["fruitsConclusion"]}

## 三、价格波动情况
{report_data["incOrReduRange"]}

## 四、数据说明
- 数据来源: {report_data["source"]}
- 生成时间: {datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')}
- 价格单位: 元/斤
- 涨跌幅: 与上一交易日相比

---
*注: 本报告由系统自动生成,仅供参考。*
"""
            
            with open(report_file, "w", encoding="utf-8") as f:
                f.write(content)
            
            self.logger.info(f"分析报告已生成: {report_file}")
            
        except Exception as e:
            self.logger.error(f"生成分析报告失败: {str(e)}")

    def generate_weekly_report(self, year, week):
        """生成周度汇总报告"""
        try:
            start_date = datetime.strptime(f'{year}-W{week:02d}-1', '%Y-W%W-%w')
            end_date = start_date + timedelta(days=6)
            
            print(f"\n正在生成第{week}周报告...")
            print(f"时间范围: {start_date.strftime('%Y-%m-%d')}{end_date.strftime('%Y-%m-%d')}")
            print("="*50)
            
            # 获周内所有报告
            reports = []
            current = start_date
            total_days = (end_date - start_date).days + 1
            
            for i in range(total_days):
                print(f"\r进度: {i+1}/{total_days} ", end="")
                report = self.fetch_daily_report(current)
                if report:
                    reports.append(report)
                current += timedelta(days=1)
            
            if not reports:
                self.logger.warning("本周无可用数据")
                return
            
            # 计算周度汇总数据
            weekly_summary = self._calculate_weekly_summary(reports)
            
            report_file = os.path.join(
                self.weekly_dir,
                f"{year}年第{week:02d}周_{start_date.strftime('%m月%d日')}-{end_date.strftime('%m月%d日')}_价格分析报告.md"
            )
            
            with open(report_file, "w", encoding="utf-8") as f:
                f.write(f"""# {year}年第{week:02d}周农产品价格分析报告
({start_date.strftime('%Y年%m月%d日')}{end_date.strftime('%Y年%m月%d日')})

## 一、本周价格概况
{weekly_summary['overview']}

## 二、价格指数变化
- 周初: {weekly_summary['index_start']}
- 周末: {weekly_summary['index_end']}
- 度变化: {weekly_summary['index_change']}

## 三、分品类周度分析
### 1. 畜禽产品
{weekly_summary['animal_summary']}

### 2. 水产品
{weekly_summary['aquatic_summary']}

### 3. 蔬菜
{weekly_summary['vegetables_summary']}

### 4. 水果
{weekly_summary['fruits_summary']}

## 四、日度价格详情
""")
            
                for report in reports:
                    pub_date = datetime.strptime(report['daylyDate'][:10], '%Y-%m-%d')
                    f.write(f"""### {pub_date.strftime('%Y年%m月%d日')}
1. 价格指数: {report.get('indexConclusion', '暂无数据')}
2. 畜禽产品: {report.get('animalConclusion', '暂无数据')}
3. 水产品: {report.get('aquaticConclusion', '暂无数据')}
4. 蔬菜: {report.get('vegetablesConclusion', '暂无数据')}
5. 水果: {report.get('fruitsConclusion', '暂无数据')}

""")
            
                f.write(f"""## 五、数据说明
- 数据来源: {reports[0]["source"]}
- 生成时间: {datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')}
- 价格单位: 元/公斤
- 跌幅: 与上期相比

---
*注: 本报告由系统自动生成,仅供参考。*""")
            
            print("\n报告生成完成!")
            self.logger.info(f"周度报告已生成: {report_file}")
            
        except Exception as e:
            self.logger.error(f"生成周度报告失败: {str(e)}")

    def _calculate_weekly_summary(self, reports):
        """计算周度汇总数据"""
        summary = {
            'overview': '',
            'index_start': reports[0].get('indexConclusion', '暂无数据'),
            'index_end': reports[-1].get('indexConclusion', '暂无数据'),
            'index_change': '',
            'animal_summary': '',
            'aquatic_summary': '',
            'vegetables_summary': '',
            'fruits_summary': ''
        }
        
        # 计算价格指数变化
        try:
            start_index = float(reports[0]['indexConclusion'].split('为')[1].split(',')[0])
            end_index = float(reports[-1]['indexConclusion'].split('为')[1].split(',')[0])
            change = end_index - start_index
            summary['index_change'] = f"{'上升' if change >= 0 else '下降'}{abs(change):.2f}个点"
        except:
            summary['index_change'] = '数据异常'
        
        # 生成概述
        summary['overview'] = f"本周农产品批发价格200指数从{summary['index_start']},到{summary['index_end']},整体{summary['index_change']}。"
        
        # 其他品类汇总...
        
        return summary

    def plot_index_trend(self, start_date, end_date):
        """绘制价格指数走势图"""
        try:
            data = self.fetch_index_data(start_date, end_date)
            if not data:
                return
            
            plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
            ax = plt.gca()
            ax.set_facecolor('#f8fcfa')
            
            dates = [item["日期"] for item in data]
            indices = [
                ("农品批发价格200指数", "#ffa940"),
                ("菜篮子指", "#73d13d"),
                ("粮油指数", "#40a9ff")
            ]
            
            for name, color in indices:
                values = [item[name] for item in data]
                plt.plot(dates, values, color=color, marker='o',
                        markersize=4, linewidth=2, label=name)
            
            plt.title('农业农村部"农产品批发价格200指数"日度走势图',
                     pad=20, fontsize=12, loc='left')
            plt.grid(True, linestyle='--', alpha=0.3)
            plt.xticks(rotation=45)
            plt.legend(loc='upper right', frameon=False)
            
            plt.tight_layout()
            plt.savefig(
                os.path.join(self.output_dir, "价格指数走势图.png"),
                dpi=300,
                bbox_inches='tight'
            )
            plt.close()
            
            self.logger.info("价格指数走势已生成")
            
        except Exception as e:
            self.logger.error(f"生成价格指数走势图失败: {str(e)}")

    def plot_pig_price_trend(self, start_date, end_date):
        """绘制猪肉价格走势图"""
        try:
            data = self.fetch_pig_price_data(start_date, end_date)
            if not data:
                return
            
            plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
            ax = plt.gca()
            ax.set_facecolor('#f8fcfa')
            
            dates = [item["日期"] for item in data]
            values = [item["全国"] for item in data]
            
            plt.plot(dates, values, color='#40a9ff', marker='o',
                    markersize=4, linewidth=2)
            plt.fill_between(dates, values, color='#e6f7ff', alpha=0.5)
            
            plt.title('"瘦肉型白条猪肉出厂价格指数"全国走势图',
                     pad=20, fontsize=12, loc='left')
            plt.grid(True, linestyle='--', alpha=0.3)
            plt.xticks(rotation=45)
            
            plt.tight_layout()
            plt.savefig(
                os.path.join(self.output_dir, "猪肉价格走势图.png"),
                dpi=300,
                bbox_inches='tight'
            )
            plt.close()
            
            self.logger.info("猪肉价格走势图已生成")
            
        except Exception as e:
            self.logger.error(f"生成猪肉价格走势图失败: {str(e)}")

    def plot_pig_price_region_trend(self, start_date, end_date):
        """绘制猪肉分区域价格走势图"""
        try:
            data = self.fetch_pig_price_data(start_date, end_date)
            if not data:
                return
            
            plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
            ax = plt.gca()
            ax.set_facecolor('#f8fcfa')
            
            dates = [item["日期"] for item in data]
            regions = [
                ("东北", "#40a9ff"),
                ("华南", "#73d13d"),
                ("华", "#ffa940"),
                ("华中", "#ff4d4f"),
                ("华东", "#9254de"),
                ("西南", "#36cfc9")
            ]
            
            for region, color in regions:
                values = [item[region] for item in data]
                plt.plot(dates, values, color=color, marker='o',
                        markersize=4, linewidth=2, label=region)
            
            plt.title('"瘦肉型条猪肉出厂价格指数"区域走势图',
                     pad=20, fontsize=12, loc='left')
            plt.grid(True, linestyle='--', alpha=0.3)
            plt.xticks(rotation=45)
            plt.legend(loc='upper right', frameon=False)
            
            plt.tight_layout()
            plt.savefig(
                os.path.join(self.output_dir, "猪肉价格区域走势图.png"),
                dpi=300,
                bbox_inches='tight'
            )
            plt.close()
            
            self.logger.info("猪肉价格区域走势图已生成")
            
        except Exception as e:
            self.logger.error(f"生成猪肉价格区域走势图失败: {str(e)}")

    def plot_grain_price_trend(self, start_date, end_date):
        """绘制粮油价格走势图"""
        try:
            data = self.fetch_grain_price_data(start_date, end_date)
            if not data:
                return
            
            plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
            ax = plt.gca()
            ax.set_facecolor('#f8fcfa')
            
            dates = [item["日期"] for item in data]
            indices = [
                ("通义粮价指数", "#40a9ff"),
                ("通义粮市指数", "#73d13d"),
                ("通义粮市第1号", "#ffa940"),
                ("通义粮天指数", "#ff4d4f"),
                ("通义粮指", "#9254de"),
                ("通义粮天指数(干粮)", "#36cfc9")
            ]
            
            for name, color in indices:
                values = [item[name] for item in data]
                plt.plot(dates, values, color=color, marker='o',
                        markersize=4, linewidth=2, label=name)
            
            plt.title('中国通义粮油发价格指数走势图',
                     pad=20, fontsize=12, loc='left')
            plt.grid(True, linestyle='--', alpha=0.3)
            plt.xticks(rotation=45)
            plt.legend(loc='upper right', frameon=False)
            
            plt.tight_layout()
            plt.savefig(
                os.path.join(self.output_dir, "粮油价格指数走势图.png"),
                dpi=300,
                bbox_inches='tight'
            )
            plt.close()
            
            self.logger.info("粮油价格指数走势图已生成")
            
        except Exception as e:
            self.logger.error(f"生成粮油价格指数走势失败: {str(e)}")

    def export_data(self, start_date, end_date, format='excel'):
        """导出数据
        
        Args:
            start_date: 开始日期
            end_date: 结束日期
            format: 导出格式,支持 'excel'/'csv'/'json'
        """
        try:
            # 获取数据
            data = {
                'index': self.fetch_index_data(start_date, end_date),
                'pig': self.fetch_pig_price_data(start_date, end_date),
                'grain': self.fetch_grain_price_data(start_date, end_date)
            }
            
            if not any(data.values()):
                return
            
            # 根据格式导出
            if format == 'excel':
                self._export_excel(data, start_date, end_date)
            elif format == 'csv':
                self._export_csv(data, start_date, end_date)
            elif format == 'json':
                self._export_json(data, start_date, end_date)
            else:
                self.logger.error(f"不支持的导出格式: {format}")
            
        except Exception as e:
            self.logger.error(f"导出数据失败: {str(e)}")

    def _clean_text(self, text):
        """清理文本内容"""
        if not text:
            return ""
        # 去除多余空白字符
        text = ' '.join(text.split())
        # 修复可能的标点符号问题
        text = text.replace('。。', '。').replace(',。', '。').replace(';。', '。')
        # 修复中文编码
        text = text.encode('utf-8').decode('utf-8', 'ignore')
        return text

    def _validate_report_data(self, report):
        """验证报告数据完整性"""
        required_fields = [
            "indexConclusion",
            "animalConclusion",
            "aquaticConclusion",
            "vegetablesConclusion",
            "fruitsConclusion"
        ]
        
        is_valid = True
        for field in required_fields:
            if not report.get(field):
                self.logger.warning(f"报告缺少 {field} 数据")
                is_valid = False
                report[field] = "暂无数据"
        
        return is_valid

    def _export_excel(self, data, start_date, end_date):
        """导出Excel数据"""
        try:
            filename = f"价格数据_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.xlsx"
            filepath = os.path.join(self.output_dir, filename)
            
            with pd.ExcelWriter(filepath) as writer:
                # 导出价格指数
                if data.get('index'):
                    df_index = pd.DataFrame(data['index'])
                    df_index.to_excel(writer, sheet_name='价格指数', index=False)
                    
                # 导出猪肉价格
                if data.get('pig'):
                    df_pig = pd.DataFrame(data['pig'])
                    df_pig.to_excel(writer, sheet_name='猪肉价格', index=False)
                    
                # 导出粮油价格
                if data.get('grain'):
                    df_grain = pd.DataFrame(data['grain'])
                    df_grain.to_excel(writer, sheet_name='粮油价格', index=False)
                    
            self.logger.info(f"���据已导出至: {filepath}")
            return True
            
        except Exception as e:
            self.logger.error(f"导出Excel失败: {str(e)}")
            return False

    def fetch_all_categories(self):
        """获取所有品类数据"""
        categories = {
            'MH': '猪肉',
            'SC': '蔬菜',
            'SG': '水果',
            'TL': '粮油',
            'SC': '水产',
            'DJ': '蛋鸡',
            'NR': '牛肉',
            'YR': '羊肉'
        }
        
        result = {}
        for code, name in categories.items():
            try:
                url = f"{API_BASE_URL}{API_ENDPOINTS['variety_list']}"
                params = {'pid': code}
                response = self._make_request(url, method='post', params=params)
                if response and response.json().get("code") == 200:
                    result[name] = response.json().get("data", [])
            except Exception as e:
                self.logger.error(f"获取{name}品类数据失败: {str(e)}")
        return result

    def fetch_market_prices(self, market_id=None, variety_id=None, start_date=None, end_date=None):
        """获取市场价格数据"""
        try:
            url = f"{API_BASE_URL}{API_ENDPOINTS['daily_price']}"
            params = {
                'marketId': market_id,
                'varietyId': variety_id,
                'startDate': start_date.strftime("%Y-%m-%d") if start_date else None,
                'endDate': end_date.strftime("%Y-%m-%d") if end_date else None
            }
            
            response = self._make_request(url, method='post', params=params)
            if response and response.json().get("code") == 200:
                return response.json().get("data", [])
            return None
        except Exception as e:
            self.logger.error(f"获取市场价格数据失败: {str(e)}")
            return None

    def fetch_analysis_reports(self, page=1, page_size=10):
        """获取分析报告列表"""
        try:
            url = f"{API_BASE_URL}{API_ENDPOINTS['analysis_report']}"
            params = {
                'pageNum': page,
                'pageSize': page_size
            }
            
            response = self._make_request(url, method='post', params=params)
            if response and response.json().get("code") == 200:
                return response.json().get("data", {}).get("list", [])
            return None
        except Exception as e:
            self.logger.error(f"获取分析报告失败: {str(e)}")
            return None

    def crawl_all_data(self, start_date, end_date):
        """爬取所有数据"""
        try:
            # 获取所有品类
            categories = self.fetch_all_categories()
            
            # 获取所有市场
            markets_response = self._make_request(
                f"{API_BASE_URL}{API_ENDPOINTS['market_list']}", 
                method='post'
            )
            markets = markets_response.json().get("data", []) if markets_response else []
            
            # 存储结果
            results = {
                'categories': categories,
                'markets': markets,
                'prices': {},
                'reports': []
            }
            
            # 获取每个品类的价格数据
            for category, varieties in categories.items():
                results['prices'][category] = {}
                for variety in varieties:
                    variety_id = variety.get('id')
                    if variety_id:
                        prices = self.fetch_market_prices(
                            variety_id=variety_id,
                            start_date=start_date,
                            end_date=end_date
                        )
                        results['prices'][category][variety.get('name')] = prices
            
            # 获取分析报告
            page = 1
            while True:
                reports = self.fetch_analysis_reports(page=page)
                if not reports:
                    break
                results['reports'].extend(reports)
                page += 1
                
            return results
            
        except Exception as e:
            self.logger.error(f"爬取所有数据失败: {str(e)}")
            return None

    def fetch_weekly_report_content(self, report_id=None):
        """获取周度报告内容"""
        try:
            url = f"{API_BASE_URL}/product/analysis-report/getReportContent"
            params = {'id': report_id} if report_id else None
            
            response = self._make_request(url, method='post', params=params)
            if not response:
                return None
            
            # 解析HTML内容
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 提取报告基本信息
            title = soup.find('h1', class_='report-title').text.strip()
            date = soup.find('div', class_='report-date').text.strip()
            source = soup.find('div', class_='report-source').text.strip()
            
            # 提取报告主体内容
            content = soup.find('div', class_='report-content')
            
            # 提取表格数据
            tables = []
            for table in content.find_all('table'):
                df = pd.read_html(str(table))[0]
                tables.append(df.to_dict('records'))
                
            # 提取文本内容
            paragraphs = []
            for p in content.find_all('p'):
                text = p.text.strip()
                if text:
                    paragraphs.append(text)
                    
            return {
                'title': title,
                'date': date,
                'source': source,
                'content': {
                    'text': paragraphs,
                    'tables': tables
                }
            }
            
        except Exception as e:
            self.logger.error(f"获取报告内容失败: {str(e)}")
            return None

    def crawl_all_reports(self, start_date=None, end_date=None):
        """爬取所有报告"""
        try:
            reports = []
            page = 1
            
            while True:
                # 获取报告列表
                report_list = self.fetch_analysis_reports(page=page)
                if not report_list:
                    break
                    
                # 过滤日期范围
                if start_date or end_date:
                    filtered_reports = []
                    for report in report_list:
                        report_date = datetime.strptime(report['publishDate'], '%Y-%m-%d')
                        if start_date and report_date < start_date:
                            continue
                        if end_date and report_date > end_date:
                            continue
                        filtered_reports.append(report)
                    report_list = filtered_reports
                    
                # 获取每个报告的详细内容
                for report in report_list:
                    report_content = self.fetch_weekly_report_content(report['id'])
                    if report_content:
                        reports.append({
                            'meta': report,
                            'content': report_content
                        })
                        
                    # 添加延时避免请求过快
                    time.sleep(1)
                    
                page += 1
                
            return reports
            
        except Exception as e:
            self.logger.error(f"爬取报告失败: {str(e)}")
            return None

    def save_reports(self, reports, output_dir='reports'):
        """保存报告到文件"""
        try:
            if not os.path.exists(output_dir):
                os.makedirs(output_dir)
                
            for report in reports:
                # 生成文件名
                date = datetime.strptime(report['meta']['publishDate'], '%Y-%m-%d')
                filename = f"{date.strftime('%Y%m%d')}_{report['meta']['id']}.json"
                filepath = os.path.join(output_dir, filename)
                
                # 保存为JSON文件
                with open(filepath, 'w', encoding='utf-8') as f:
                    json.dump(report, f, ensure_ascii=False, indent=2)
                    
            return True
            
        except Exception as e:
            self.logger.error(f"保存报告失败: {str(e)}")
            return False

if __name__ == "__main__":
    try:
        # 检查并安装依赖包
        check_and_install_packages()
        
        # 运行爬虫
        crawler = ReportCrawler()
        crawler.run()
        
    except KeyboardInterrupt:
        print("\n程序已被用中断")
        sys.exit(0)
    except Exception as e:
        print(f"\n程序运行出错: {str(e)}")
        sys.exit(1) 

一、功能介绍

本程序用于爬取农业农村部发布的农产品价格监测报告,包括以下功能:

1. 日度报告

  • 生成今日分析报告
  • 生成指定日期报告
  • 包含价格指数、分品类分析等

2. 周度报告

  • 生成本周分析报告
  • 生成指定周报告
  • 汇总周内价格变化

3. 价格走势

  • 农产品价格200指数走势
  • 猪肉价格全国走势
  • 猪肉价格区域走势
  • 粮油价格指数走势

4. 数据导出

  • 支持Excel格式导出
  • 包含多个数据分类
  • 支持时间范围选择

二、使用说明

1. 环境要求

  • Python 3.7+
  • 依赖包会自动安装:
    • requests: HTTP请求
    • pandas: 数据处理
    • matplotlib: 绘图支持
    • urllib3: HTTP客户端
    • openpyxl: Excel支持
    • colorama: 控制台颜色

2. 运行方法

python
直接运行程序
python report_crawler.py

3. 功能菜单

农产品价格报告爬虫系统

生成今日分析报告
生成本周分析报告
生成指定日期报告
生成指定周报告
生成价格指数走势图
生成猪肉价格走势图
生成区域价格走势图
生成粮油价格走势图
导出Excel数据
退出系统

4. 输出文件

  • reports/daily: 日度分析报告
  • reports/weekly: 周度分析报告
  • reports: 价格走势图和Excel数据

三、数据来源

  • 农业农村部市场信息中心
  • 数据更新频率: 每日14:00

四、注意事项

  1. 首次运行会自动检查并安装依赖包
  2. 所有数据仅供学习交流使用
  3. 建议使用时设置合理的时间范围
  4. 如遇到问题可查看日志文件

五、更新记录

v1.0.0 (2024-12-05)

  • 实现基础数据爬取功能
  • 支持生成分析报告
  • 支持绘制价格走势图
  • 支持导出Excel数据

六、联系方式

作者: xiaohai
仅用于学习交流


http://www.kler.cn/a/520682.html

相关文章:

  • 【25考研】人大计算机考研复试该怎么准备?有哪些注意事项?
  • 基于语义-拓扑-度量表征引导的大语言模型推理的空中视觉语言导航
  • 14-6-1C++STL的list
  • 国产编辑器EverEdit - 大纲视图
  • [Dialog屏幕开发] 屏幕绘制(下拉菜单)
  • Android GLSurfaceView 覆盖其它控件问题 (RK平台)
  • Prompt 编写进阶指南
  • 快速构建springboot+java+mongodb新闻发布系统简单Demo
  • Ansible入门学习之基础元素介绍
  • 第23章 质量至上与家庭交底
  • 从崩溃难题看 C 标准库与 Rust:线程安全问题引发的深度思考
  • Leetcode100热题——盛水最多容器
  • Nginx前端后端共用一个域名如何配置
  • 机密信息密送- 文字加密解密
  • Vue.js组件开发-实现多个文件附件压缩下载
  • 力扣-链表-203 移除链表元素
  • 大模型训练策略与架构优化实践指南
  • ES6+新特性,var、let 和 const 的区别
  • 分布式系统学习10:分布式事务
  • 学习std::is_base_of笔记
  • 可以称之为“yyds”的物联网开源框架有哪几个?
  • [java] 集合-ArrayList篇
  • Rust:Rhai脚本编程示例
  • 设计模式Python版 原型模式
  • 【Validator】字段验证器介绍,及基本使用go案例
  • MongoDB中的横向扩容数据分片