农产品价格报告爬虫使用说明
农产品价格报告爬虫使用说明
# **************************************************************************
# * *
# * 农产品价格报告爬虫 *
# * *
# * 作者: xiaohai *
# * 版本: v1.0.0 *
# * 日期: 2024-12-05 *
# * *
# * 功能说明: *
# * 1. 日度报告 *
# * - 生成今日分析报告 *
# * - 生成指定日期报告 *
# * - 包含价格指数、分品类分析等 *
# * *
# * 2. 周度报告 *
# * - 生成本周分析报告 *
# * - 生成指定周报告 *
# * - 汇总周内价格变化 *
# * *
# * 3. 价格走势 *
# * - 农产品价格200指数走势 *
# * - 猪肉价格全国走势 *
# * - 猪肉价格区域走势 *
# * - 粮油价格指数走势 *
# * *
# * 4. 数据导出 *
# * - 支持Excel格式导出 *
# * - 包含多个数据分类 *
# * - 支持时间范围选择 *
# * *
# * : 农业农村部市场信息中心 *
# * 版权声明: 仅用于学习交流 *
# * *
# **************************************************************************
import os
import json
import logging
import requests
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import pandas as pd
import warnings
import urllib3
import sys
import subprocess
import pkg_resources
from bs4 import BeautifulSoup
import re
import time
# 禁用SSL警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
warnings.filterwarnings('ignore')
# 配置常量
VERSION = 'v1.0.0'
AUTHOR = 'xiaohai'
DATA_SOURCE = '农业农村部市场信息中心'
# API配置
API_BASE_URL = 'https://ncpscxx.moa.gov.cn'
API_ENDPOINTS = {
'price_index': '/product/common-price-index/getIndexList',
'variety_list': '/product/sys-variety/selectList',
'price_trend': '/product/price-info/getPriceInfoList',
'market_list': '/product/sys-market/selectList',
'daily_price': '/product/price-info/getDailyPrice',
'analysis_report': '/product/analysis-report/pageList'
}
# 输出目录配置
OUTPUT_DIRS = {
'base': 'reports',
'daily': 'reports/daily',
'weekly': 'reports/weekly'
}
# 图表样式配置
CHART_STYLE = {
'figure': {
'figsize': (12, 6),
'facecolor': '#f8fcfa'
},
'grid': {
'linestyle': '--',
'alpha': 0.3,
'color': 'gray'
},
'line': {
'marker': 'o',
'markersize': 4,
'linewidth': 2
},
'colors': {
'blue': '#40a9ff',
'green': '#73d13d',
'orange': '#ffa940',
'red': '#ff4d4f',
'purple': '#9254de',
'cyan': '#36cfc9'
}
}
def check_and_install_packages():
"""检查并安装所需的包"""
required_packages = {
'requests': 'requests', # HTTP请求
'pandas': 'pandas', # 数据处理
'matplotlib': 'matplotlib', # 绘图支持
'urllib3': 'urllib3', # HTTP客户端
'openpyxl': 'openpyxl', # Excel支持
'colorama': 'colorama' # 控制台颜色
}
print("\n" + "="*50)
print("检查并安装依赖包...")
print("="*50)
try:
import colorama
colorama.init()
success_mark = colorama.Fore.GREEN + "✓" + colorama.Style.RESET_ALL
error_mark = colorama.Fore.RED + "✗" + colorama.Style.RESET_ALL
except ImportError:
success_mark = "✓"
error_mark = "✗"
all_success = True
for package, import_name in required_packages.items():
try:
pkg_resources.require(package)
print(f"{success_mark} {package:15} 已安装")
except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict):
print(f"{error_mark} {package:15} 未安装,正在安装...")
try:
subprocess.check_call([
sys.executable,
"-m",
"pip",
"install",
"--disable-pip-version-check",
"--no-cache-dir",
package
], stdout=subprocess.DEVNULL)
print(f"{success_mark} {package:15} 安装成功")
except Exception as e:
print(f"{error_mark} {package:15} 安装失败: {str(e)}")
all_success = False
print("\n依赖包检查" + ("全部完成" if all_success else "存在问题"))
print("="*50 + "\n")
if not all_success:
print("某些依赖包安装失败,程序能无法正常运行!")
if input("是否继续运行?(y/n): ").lower() != 'y':
sys.exit(1)
class ReportCrawler:
"""农产品价格报告爬虫"""
def __init__(self):
# 禁用SSL警告
warnings.filterwarnings('ignore')
# 基础配置
self._setup_directories()
self._setup_logger()
self._setup_api()
def _setup_directories(self):
"""创建输出目录"""
self.output_dir = "reports"
self.daily_dir = os.path.join(self.output_dir, "daily")
self.weekly_dir = os.path.join(self.output_dir, "weekly")
for d in [self.output_dir, self.daily_dir, self.weekly_dir]:
if not os.path.exists(d):
os.makedirs(d)
def _setup_logger(self):
"""配置日志"""
log_file = os.path.join("logs", f"crawler_{datetime.now().strftime('%Y%m%d')}.log")
os.makedirs("logs", exist_ok=True)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setFormatter(formatter)
# 制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# 配置日志器
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
def _setup_api(self):
"""配置API"""
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Origin': 'https://ncpscxx.moa.gov.cn',
'Referer': 'https://ncpscxx.moa.gov.cn/',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Content-Type': 'application/json;charset=UTF-8'
}
def show_menu(self):
"""显示功能菜单"""
menu = """
农产品价格报告爬虫系统
====================
1. 成今日分析报告
2. 生成本周分报告
3. 生成指定日期报告
4. 生成指定周报告
5. 生成价格指数走势图
6. 生成猪肉价格走势图
7. 生成区域价格走势图
8. 生成粮油价格走势图
9. 导出Excel数据
0. 退出系统
请输入���能编号(0-9): """
print("\n" + "="*50) # 添加分隔线
choice = input(menu)
print("="*50 + "\n") # 添加分隔线
return choice
def run(self):
"""运行系统"""
while True:
choice = self.show_menu()
if choice == "0":
print("感谢使用,再见!")
break
elif choice == "1":
print("正在生成今日分析报告...")
self.generate_daily_report(datetime.now())
elif choice == "2":
print("正在生成本周分析报告...")
today = datetime.now()
self.generate_weekly_report(today.year, int(today.strftime("%W")))
elif choice == "3":
date_str = input("请输入日期(格式:YYYY-MM-DD): ")
try:
date = datetime.strptime(date_str, "%Y-%m-%d")
self.generate_daily_report(date)
except:
print("日期格式错误!")
elif choice == "4":
year = int(input("请输入年份: "))
week = int(input("请输入周数(1-52): "))
self.generate_weekly_report(year, week)
elif choice == "5":
days = int(input("请输入要查看的天数: "))
end = datetime.now()
start = end - timedelta(days=days)
self.plot_index_trend(start, end)
elif choice == "6":
days = int(input("请输入要查看的天数: "))
end = datetime.now()
start = end - timedelta(days=days)
self.plot_pig_price_trend(start, end)
elif choice == "7":
days = int(input("请输入要查看的天数: "))
end = datetime.now()
start = end - timedelta(days=days)
self.plot_pig_price_region_trend(start, end)
elif choice == "8":
days = int(input("请输入要查看的天数: "))
end = datetime.now()
start = end - timedelta(days=days)
self.plot_grain_price_trend(start, end)
elif choice == "9":
days = int(input("请输入要导天数: "))
end = datetime.now()
start = end - timedelta(days=days)
self.export_data(start, end)
else:
print("无效的选择,请重试!")
input("\n按回车键继续...")
def _make_request(self, url, method='get', params=None, data=None):
"""发送HTTP请求
Args:
url: 请求URL
method: 请求方法,支持 'get'/'post'
params: URL参数
data: POST数据
Returns:
Response对象或None(请求失败)
"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
if method.lower() == 'get':
response = requests.get(
url,
params=params,
headers=headers,
verify=False,
timeout=10
)
else:
response = requests.post(
url,
params=params,
json=data, # 添加json参数支持
headers=headers,
verify=False,
timeout=10
)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
self.logger.error(f"请求失败: {str(e)}")
return None
def fetch_daily_report(self, date):
"""获取日度价格报告"""
try:
url = f"{API_BASE_URL}/api/FarmDaily/list"
data = {
"daylyDate": date.strftime("%Y-%m-%d")
}
response = self._make_request(url, method='post', data=data)
if not response:
return None
data = response.json()
if data.get("code") == 200 and data.get("content",{}).get("list"):
# 找到指定日期的报告
target_date = date.strftime("%Y-%m-%d")
for report in data["content"]["list"]:
if report["daylyDate"].startswith(target_date):
# 提取所需数据
return {
"conclusion": report["counclesion"],
"indexConclusion": report["indexConclusion"],
"animalConclusion": report["animalConclusion"],
"aquaticConclusion": report["aquaticConclusion"],
"vegetablesConclusion": report["vegetablesConclusion"],
"fruitsConclusion": report["fruitsConclusion"],
"content": report["countent"],
"incOrReduRange": report["incOrReduRange"]
}
self.logger.warning(f"未找到{target_date}的报告")
return None
self.logger.warning(f"获取数据失败: {data.get('message', '未知错误')}")
return None
except Exception as e:
self.logger.error(f"获取日度报告出错: {str(e)}")
return None
def _extract_conclusions(self, report):
"""从报告中提取各类结论"""
try:
return {
"index": report.get("indexConclusion", ""),
"animal": report.get("animalConclusion", ""),
"aquatic": report.get("aquaticConclusion", ""),
"vegetables": report.get("vegetablesConclusion", ""),
"fruits": report.get("fruitsConclusion", ""),
"range": report.get("incOrReduRange", "")
}
except Exception as e:
self.logger.error(f"提取论出错: {str(e)}")
return {}
def fetch_index_data(self, start_date, end_date):
"""获取价格指数数据"""
try:
url = "https://pfsc.agri.cn/price_portal/pi-info-day/getPortalPiInfoDay"
response = requests.post(url, headers=self.headers, verify=False)
data = response.json()
if data["code"] == 200:
result = []
for item in data["content"]:
pub_date = datetime.strptime(item["publishDate"], "%Y-%m-%d")
if start_date <= pub_date <= end_date:
result.append({
"日期": item["publishDate"],
"农产品批发价格200指数": item["agriculture"],
"粮油指数": item["grainAndOil"],
"篮子数": item["vegetableBasket"]
})
return result
return None
except Exception as e:
self.logger.error(f"获取指数数据失败: {str(e)}")
return None
def fetch_pig_price_data(self, start_date, end_date):
"""获取猪肉价格数据"""
try:
url = f"{API_BASE_URL}{API_ENDPOINTS['variety_list']}"
params = {'pid': 'MH'} # 猪肉品类ID
response = self._make_request(url, method='post', params=params)
if not response:
return None
data = response.json()
if data.get("code") == 200 and data.get("data"):
# 转换数据格式
result = []
for item in data["data"]:
if start_date <= datetime.strptime(item["date"], "%Y-%m-%d") <= end_date:
result.append({
"日期": item["date"],
"全国": float(item["national"]),
"东北": float(item["northEast"]),
"华北": float(item["northChina"]),
"华东": float(item["eastChina"]),
"华中": float(item["centralChina"]),
"华南": float(item["southChina"]),
"西南": float(item["southWest"])
})
return result
self.logger.warning(f"获取数据失败: {data.get('message', '未知错误')}")
return None
except Exception as e:
self.logger.error(f"获取猪肉价格数据失败: {str(e)}")
return None
def fetch_grain_price_data(self, start_date, end_date):
"""获取粮油价格数据"""
try:
url = f"{API_BASE_URL}{API_ENDPOINTS['variety_list']}"
params = {'pid': 'TL'} # 粮油品类ID
response = self._make_request(url, method='post', params=params)
if not response:
return None
data = response.json()
if data.get("code") == 200 and data.get("data"):
# 转换数据格式
result = []
for item in data["data"]:
if start_date <= datetime.strptime(item["date"], "%Y-%m-%d") <= end_date:
result.append({
"日期": item["date"],
"通义粮价指数": float(item["grainPriceIndex"]),
"通义粮市指数": float(item["grainMarketIndex"]),
"通义粮市第1号": float(item["grainMarketNo1"]),
"通义粮天指数": float(item["grainDayIndex"]),
"通义���指": float(item["grainIndex"]),
"通义粮天指数(干粮)": float(item["grainDayDryIndex"])
})
return result
self.logger.warning(f"获取数据失败: {data.get('message', '未知错误')}")
return None
except Exception as e:
self.logger.error(f"获取粮油价格数据失败: {str(e)}")
return None
def generate_daily_report(self, date):
"""生成每日分析报告"""
try:
report_data = self.fetch_daily_report(date)
if not report_data:
self.logger.warning(f"未获取到 {date.strftime('%Y-%m-%d')} 的报告数据")
return
report_file = os.path.join(
self.daily_dir,
f"{date.strftime('%Y年%m月%d日')}_价格分析报告.md"
)
# 使用更清晰模板格式
content = f"""# {date.strftime('%Y年%m月%d日')} 农产品价格分析报告
## 一、价格指数变化
{report_data["indexConclusion"]}
## 二、分品类分析
### 1. 畜禽产品
{report_data["animalConclusion"]}
### 2. 水产品
{report_data["aquaticConclusion"]}
### 3. 蔬菜
{report_data["vegetablesConclusion"]}
### 4. 水果
{report_data["fruitsConclusion"]}
## 三、价格波动情况
{report_data["incOrReduRange"]}
## 四、数据说明
- 数据来源: {report_data["source"]}
- 生成时间: {datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')}
- 价格单位: 元/斤
- 涨跌幅: 与上一交易日相比
---
*注: 本报告由系统自动生成,仅供参考。*
"""
with open(report_file, "w", encoding="utf-8") as f:
f.write(content)
self.logger.info(f"分析报告已生成: {report_file}")
except Exception as e:
self.logger.error(f"生成分析报告失败: {str(e)}")
def generate_weekly_report(self, year, week):
"""生成周度汇总报告"""
try:
start_date = datetime.strptime(f'{year}-W{week:02d}-1', '%Y-W%W-%w')
end_date = start_date + timedelta(days=6)
print(f"\n正在生成第{week}周报告...")
print(f"时间范围: {start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}")
print("="*50)
# 获周内所有报告
reports = []
current = start_date
total_days = (end_date - start_date).days + 1
for i in range(total_days):
print(f"\r进度: {i+1}/{total_days} ", end="")
report = self.fetch_daily_report(current)
if report:
reports.append(report)
current += timedelta(days=1)
if not reports:
self.logger.warning("本周无可用数据")
return
# 计算周度汇总数据
weekly_summary = self._calculate_weekly_summary(reports)
report_file = os.path.join(
self.weekly_dir,
f"{year}年第{week:02d}周_{start_date.strftime('%m月%d日')}-{end_date.strftime('%m月%d日')}_价格分析报告.md"
)
with open(report_file, "w", encoding="utf-8") as f:
f.write(f"""# {year}年第{week:02d}周农产品价格分析报告
({start_date.strftime('%Y年%m月%d日')} 至 {end_date.strftime('%Y年%m月%d日')})
## 一、本周价格概况
{weekly_summary['overview']}
## 二、价格指数变化
- 周初: {weekly_summary['index_start']}
- 周末: {weekly_summary['index_end']}
- 度变化: {weekly_summary['index_change']}
## 三、分品类周度分析
### 1. 畜禽产品
{weekly_summary['animal_summary']}
### 2. 水产品
{weekly_summary['aquatic_summary']}
### 3. 蔬菜
{weekly_summary['vegetables_summary']}
### 4. 水果
{weekly_summary['fruits_summary']}
## 四、日度价格详情
""")
for report in reports:
pub_date = datetime.strptime(report['daylyDate'][:10], '%Y-%m-%d')
f.write(f"""### {pub_date.strftime('%Y年%m月%d日')}
1. 价格指数: {report.get('indexConclusion', '暂无数据')}
2. 畜禽产品: {report.get('animalConclusion', '暂无数据')}
3. 水产品: {report.get('aquaticConclusion', '暂无数据')}
4. 蔬菜: {report.get('vegetablesConclusion', '暂无数据')}
5. 水果: {report.get('fruitsConclusion', '暂无数据')}
""")
f.write(f"""## 五、数据说明
- 数据来源: {reports[0]["source"]}
- 生成时间: {datetime.now().strftime('%Y年%m月%d日 %H:%M:%S')}
- 价格单位: 元/公斤
- 跌幅: 与上期相比
---
*注: 本报告由系统自动生成,仅供参考。*""")
print("\n报告生成完成!")
self.logger.info(f"周度报告已生成: {report_file}")
except Exception as e:
self.logger.error(f"生成周度报告失败: {str(e)}")
def _calculate_weekly_summary(self, reports):
"""计算周度汇总数据"""
summary = {
'overview': '',
'index_start': reports[0].get('indexConclusion', '暂无数据'),
'index_end': reports[-1].get('indexConclusion', '暂无数据'),
'index_change': '',
'animal_summary': '',
'aquatic_summary': '',
'vegetables_summary': '',
'fruits_summary': ''
}
# 计算价格指数变化
try:
start_index = float(reports[0]['indexConclusion'].split('为')[1].split(',')[0])
end_index = float(reports[-1]['indexConclusion'].split('为')[1].split(',')[0])
change = end_index - start_index
summary['index_change'] = f"{'上升' if change >= 0 else '下降'}{abs(change):.2f}个点"
except:
summary['index_change'] = '数据异常'
# 生成概述
summary['overview'] = f"本周农产品批发价格200指数从{summary['index_start']},到{summary['index_end']},整体{summary['index_change']}。"
# 其他品类汇总...
return summary
def plot_index_trend(self, start_date, end_date):
"""绘制价格指数走势图"""
try:
data = self.fetch_index_data(start_date, end_date)
if not data:
return
plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
ax = plt.gca()
ax.set_facecolor('#f8fcfa')
dates = [item["日期"] for item in data]
indices = [
("农品批发价格200指数", "#ffa940"),
("菜篮子指", "#73d13d"),
("粮油指数", "#40a9ff")
]
for name, color in indices:
values = [item[name] for item in data]
plt.plot(dates, values, color=color, marker='o',
markersize=4, linewidth=2, label=name)
plt.title('农业农村部"农产品批发价格200指数"日度走势图',
pad=20, fontsize=12, loc='left')
plt.grid(True, linestyle='--', alpha=0.3)
plt.xticks(rotation=45)
plt.legend(loc='upper right', frameon=False)
plt.tight_layout()
plt.savefig(
os.path.join(self.output_dir, "价格指数走势图.png"),
dpi=300,
bbox_inches='tight'
)
plt.close()
self.logger.info("价格指数走势已生成")
except Exception as e:
self.logger.error(f"生成价格指数走势图失败: {str(e)}")
def plot_pig_price_trend(self, start_date, end_date):
"""绘制猪肉价格走势图"""
try:
data = self.fetch_pig_price_data(start_date, end_date)
if not data:
return
plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
ax = plt.gca()
ax.set_facecolor('#f8fcfa')
dates = [item["日期"] for item in data]
values = [item["全国"] for item in data]
plt.plot(dates, values, color='#40a9ff', marker='o',
markersize=4, linewidth=2)
plt.fill_between(dates, values, color='#e6f7ff', alpha=0.5)
plt.title('"瘦肉型白条猪肉出厂价格指数"全国走势图',
pad=20, fontsize=12, loc='left')
plt.grid(True, linestyle='--', alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(
os.path.join(self.output_dir, "猪肉价格走势图.png"),
dpi=300,
bbox_inches='tight'
)
plt.close()
self.logger.info("猪肉价格走势图已生成")
except Exception as e:
self.logger.error(f"生成猪肉价格走势图失败: {str(e)}")
def plot_pig_price_region_trend(self, start_date, end_date):
"""绘制猪肉分区域价格走势图"""
try:
data = self.fetch_pig_price_data(start_date, end_date)
if not data:
return
plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
ax = plt.gca()
ax.set_facecolor('#f8fcfa')
dates = [item["日期"] for item in data]
regions = [
("东北", "#40a9ff"),
("华南", "#73d13d"),
("华", "#ffa940"),
("华中", "#ff4d4f"),
("华东", "#9254de"),
("西南", "#36cfc9")
]
for region, color in regions:
values = [item[region] for item in data]
plt.plot(dates, values, color=color, marker='o',
markersize=4, linewidth=2, label=region)
plt.title('"瘦肉型条猪肉出厂价格指数"区域走势图',
pad=20, fontsize=12, loc='left')
plt.grid(True, linestyle='--', alpha=0.3)
plt.xticks(rotation=45)
plt.legend(loc='upper right', frameon=False)
plt.tight_layout()
plt.savefig(
os.path.join(self.output_dir, "猪肉价格区域走势图.png"),
dpi=300,
bbox_inches='tight'
)
plt.close()
self.logger.info("猪肉价格区域走势图已生成")
except Exception as e:
self.logger.error(f"生成猪肉价格区域走势图失败: {str(e)}")
def plot_grain_price_trend(self, start_date, end_date):
"""绘制粮油价格走势图"""
try:
data = self.fetch_grain_price_data(start_date, end_date)
if not data:
return
plt.figure(figsize=(12, 6), facecolor='#f8fcfa')
ax = plt.gca()
ax.set_facecolor('#f8fcfa')
dates = [item["日期"] for item in data]
indices = [
("通义粮价指数", "#40a9ff"),
("通义粮市指数", "#73d13d"),
("通义粮市第1号", "#ffa940"),
("通义粮天指数", "#ff4d4f"),
("通义粮指", "#9254de"),
("通义粮天指数(干粮)", "#36cfc9")
]
for name, color in indices:
values = [item[name] for item in data]
plt.plot(dates, values, color=color, marker='o',
markersize=4, linewidth=2, label=name)
plt.title('中国通义粮油发价格指数走势图',
pad=20, fontsize=12, loc='left')
plt.grid(True, linestyle='--', alpha=0.3)
plt.xticks(rotation=45)
plt.legend(loc='upper right', frameon=False)
plt.tight_layout()
plt.savefig(
os.path.join(self.output_dir, "粮油价格指数走势图.png"),
dpi=300,
bbox_inches='tight'
)
plt.close()
self.logger.info("粮油价格指数走势图已生成")
except Exception as e:
self.logger.error(f"生成粮油价格指数走势失败: {str(e)}")
def export_data(self, start_date, end_date, format='excel'):
"""导出数据
Args:
start_date: 开始日期
end_date: 结束日期
format: 导出格式,支持 'excel'/'csv'/'json'
"""
try:
# 获取数据
data = {
'index': self.fetch_index_data(start_date, end_date),
'pig': self.fetch_pig_price_data(start_date, end_date),
'grain': self.fetch_grain_price_data(start_date, end_date)
}
if not any(data.values()):
return
# 根据格式导出
if format == 'excel':
self._export_excel(data, start_date, end_date)
elif format == 'csv':
self._export_csv(data, start_date, end_date)
elif format == 'json':
self._export_json(data, start_date, end_date)
else:
self.logger.error(f"不支持的导出格式: {format}")
except Exception as e:
self.logger.error(f"导出数据失败: {str(e)}")
def _clean_text(self, text):
"""清理文本内容"""
if not text:
return ""
# 去除多余空白字符
text = ' '.join(text.split())
# 修复可能的标点符号问题
text = text.replace('。。', '。').replace(',。', '。').replace(';。', '。')
# 修复中文编码
text = text.encode('utf-8').decode('utf-8', 'ignore')
return text
def _validate_report_data(self, report):
"""验证报告数据完整性"""
required_fields = [
"indexConclusion",
"animalConclusion",
"aquaticConclusion",
"vegetablesConclusion",
"fruitsConclusion"
]
is_valid = True
for field in required_fields:
if not report.get(field):
self.logger.warning(f"报告缺少 {field} 数据")
is_valid = False
report[field] = "暂无数据"
return is_valid
def _export_excel(self, data, start_date, end_date):
"""导出Excel数据"""
try:
filename = f"价格数据_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}.xlsx"
filepath = os.path.join(self.output_dir, filename)
with pd.ExcelWriter(filepath) as writer:
# 导出价格指数
if data.get('index'):
df_index = pd.DataFrame(data['index'])
df_index.to_excel(writer, sheet_name='价格指数', index=False)
# 导出猪肉价格
if data.get('pig'):
df_pig = pd.DataFrame(data['pig'])
df_pig.to_excel(writer, sheet_name='猪肉价格', index=False)
# 导出粮油价格
if data.get('grain'):
df_grain = pd.DataFrame(data['grain'])
df_grain.to_excel(writer, sheet_name='粮油价格', index=False)
self.logger.info(f"���据已导出至: {filepath}")
return True
except Exception as e:
self.logger.error(f"导出Excel失败: {str(e)}")
return False
def fetch_all_categories(self):
"""获取所有品类数据"""
categories = {
'MH': '猪肉',
'SC': '蔬菜',
'SG': '水果',
'TL': '粮油',
'SC': '水产',
'DJ': '蛋鸡',
'NR': '牛肉',
'YR': '羊肉'
}
result = {}
for code, name in categories.items():
try:
url = f"{API_BASE_URL}{API_ENDPOINTS['variety_list']}"
params = {'pid': code}
response = self._make_request(url, method='post', params=params)
if response and response.json().get("code") == 200:
result[name] = response.json().get("data", [])
except Exception as e:
self.logger.error(f"获取{name}品类数据失败: {str(e)}")
return result
def fetch_market_prices(self, market_id=None, variety_id=None, start_date=None, end_date=None):
"""获取市场价格数据"""
try:
url = f"{API_BASE_URL}{API_ENDPOINTS['daily_price']}"
params = {
'marketId': market_id,
'varietyId': variety_id,
'startDate': start_date.strftime("%Y-%m-%d") if start_date else None,
'endDate': end_date.strftime("%Y-%m-%d") if end_date else None
}
response = self._make_request(url, method='post', params=params)
if response and response.json().get("code") == 200:
return response.json().get("data", [])
return None
except Exception as e:
self.logger.error(f"获取市场价格数据失败: {str(e)}")
return None
def fetch_analysis_reports(self, page=1, page_size=10):
"""获取分析报告列表"""
try:
url = f"{API_BASE_URL}{API_ENDPOINTS['analysis_report']}"
params = {
'pageNum': page,
'pageSize': page_size
}
response = self._make_request(url, method='post', params=params)
if response and response.json().get("code") == 200:
return response.json().get("data", {}).get("list", [])
return None
except Exception as e:
self.logger.error(f"获取分析报告失败: {str(e)}")
return None
def crawl_all_data(self, start_date, end_date):
"""爬取所有数据"""
try:
# 获取所有品类
categories = self.fetch_all_categories()
# 获取所有市场
markets_response = self._make_request(
f"{API_BASE_URL}{API_ENDPOINTS['market_list']}",
method='post'
)
markets = markets_response.json().get("data", []) if markets_response else []
# 存储结果
results = {
'categories': categories,
'markets': markets,
'prices': {},
'reports': []
}
# 获取每个品类的价格数据
for category, varieties in categories.items():
results['prices'][category] = {}
for variety in varieties:
variety_id = variety.get('id')
if variety_id:
prices = self.fetch_market_prices(
variety_id=variety_id,
start_date=start_date,
end_date=end_date
)
results['prices'][category][variety.get('name')] = prices
# 获取分析报告
page = 1
while True:
reports = self.fetch_analysis_reports(page=page)
if not reports:
break
results['reports'].extend(reports)
page += 1
return results
except Exception as e:
self.logger.error(f"爬取所有数据失败: {str(e)}")
return None
def fetch_weekly_report_content(self, report_id=None):
"""获取周度报告内容"""
try:
url = f"{API_BASE_URL}/product/analysis-report/getReportContent"
params = {'id': report_id} if report_id else None
response = self._make_request(url, method='post', params=params)
if not response:
return None
# 解析HTML内容
soup = BeautifulSoup(response.text, 'html.parser')
# 提取报告基本信息
title = soup.find('h1', class_='report-title').text.strip()
date = soup.find('div', class_='report-date').text.strip()
source = soup.find('div', class_='report-source').text.strip()
# 提取报告主体内容
content = soup.find('div', class_='report-content')
# 提取表格数据
tables = []
for table in content.find_all('table'):
df = pd.read_html(str(table))[0]
tables.append(df.to_dict('records'))
# 提取文本内容
paragraphs = []
for p in content.find_all('p'):
text = p.text.strip()
if text:
paragraphs.append(text)
return {
'title': title,
'date': date,
'source': source,
'content': {
'text': paragraphs,
'tables': tables
}
}
except Exception as e:
self.logger.error(f"获取报告内容失败: {str(e)}")
return None
def crawl_all_reports(self, start_date=None, end_date=None):
"""爬取所有报告"""
try:
reports = []
page = 1
while True:
# 获取报告列表
report_list = self.fetch_analysis_reports(page=page)
if not report_list:
break
# 过滤日期范围
if start_date or end_date:
filtered_reports = []
for report in report_list:
report_date = datetime.strptime(report['publishDate'], '%Y-%m-%d')
if start_date and report_date < start_date:
continue
if end_date and report_date > end_date:
continue
filtered_reports.append(report)
report_list = filtered_reports
# 获取每个报告的详细内容
for report in report_list:
report_content = self.fetch_weekly_report_content(report['id'])
if report_content:
reports.append({
'meta': report,
'content': report_content
})
# 添加延时避免请求过快
time.sleep(1)
page += 1
return reports
except Exception as e:
self.logger.error(f"爬取报告失败: {str(e)}")
return None
def save_reports(self, reports, output_dir='reports'):
"""保存报告到文件"""
try:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
for report in reports:
# 生成文件名
date = datetime.strptime(report['meta']['publishDate'], '%Y-%m-%d')
filename = f"{date.strftime('%Y%m%d')}_{report['meta']['id']}.json"
filepath = os.path.join(output_dir, filename)
# 保存为JSON文件
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
self.logger.error(f"保存报告失败: {str(e)}")
return False
if __name__ == "__main__":
try:
# 检查并安装依赖包
check_and_install_packages()
# 运行爬虫
crawler = ReportCrawler()
crawler.run()
except KeyboardInterrupt:
print("\n程序已被用中断")
sys.exit(0)
except Exception as e:
print(f"\n程序运行出错: {str(e)}")
sys.exit(1)
一、功能介绍
本程序用于爬取农业农村部发布的农产品价格监测报告,包括以下功能:
1. 日度报告
- 生成今日分析报告
- 生成指定日期报告
- 包含价格指数、分品类分析等
2. 周度报告
- 生成本周分析报告
- 生成指定周报告
- 汇总周内价格变化
3. 价格走势
- 农产品价格200指数走势
- 猪肉价格全国走势
- 猪肉价格区域走势
- 粮油价格指数走势
4. 数据导出
- 支持Excel格式导出
- 包含多个数据分类
- 支持时间范围选择
二、使用说明
1. 环境要求
- Python 3.7+
- 依赖包会自动安装:
- requests: HTTP请求
- pandas: 数据处理
- matplotlib: 绘图支持
- urllib3: HTTP客户端
- openpyxl: Excel支持
- colorama: 控制台颜色
2. 运行方法
python
直接运行程序
python report_crawler.py
3. 功能菜单
农产品价格报告爬虫系统
生成今日分析报告
生成本周分析报告
生成指定日期报告
生成指定周报告
生成价格指数走势图
生成猪肉价格走势图
生成区域价格走势图
生成粮油价格走势图
导出Excel数据
退出系统
4. 输出文件
- reports/daily: 日度分析报告
- reports/weekly: 周度分析报告
- reports: 价格走势图和Excel数据
三、数据来源
- 农业农村部市场信息中心
- 数据更新频率: 每日14:00
四、注意事项
- 首次运行会自动检查并安装依赖包
- 所有数据仅供学习交流使用
- 建议使用时设置合理的时间范围
- 如遇到问题可查看日志文件
五、更新记录
v1.0.0 (2024-12-05)
- 实现基础数据爬取功能
- 支持生成分析报告
- 支持绘制价格走势图
- 支持导出Excel数据
六、联系方式
作者: xiaohai
仅用于学习交流