一个py文件搞定mysql查询+Json转换+表数据提取+根据数据条件生成excel文件+打包运行一条龙
import os
import argparse
import pymssql
import json
import pandas as pd
from datetime import datetime
from pandas.io.formats.excel import ExcelFormatter
import openpyxl
# 投注类型映射字典
BET_MAPPING = {
1: 'WIN', 2: 'PLA', 3: 'QIN', 4: 'QPL',
5: 'DBL', 6: 'TCE', 7: 'QTT', 8: 'D-Q',
9: 'TBL', 10: 'T-T', 11: '6UP', 12: 'D-T',
13: 'TRI', 14: 'FCT', 17: 'F-F'
}
Meetingloc = {'ST':1, 'HV':2, 'S1':5, 'S2':6,
'S3':7, 'S4':8, 'S5':9}
def convert_pool_bitmap(pool_list):
"""将数字列表转换为投注类型"""
return [BET_MAPPING.get(num, f'未知({num})') for num in pool_list]
def convert_meeting_loc(venue_list):
"""将数字列表转换为 meeting 类型"""
return Meetingloc.get(venue_list)
def mains(date):
# 数据库配置(需要根据实际情况修改)
server = 'your server'
user = 'your user'
password = 'your pwd'
database = 'your DB name'
port = 'your port'
# 准备数据容器
output_data = []
try:
# 建立数据库连接
conn = pymssql.connect(
server=server,
user=user,
password=password,
database=database,
port=port
)
sql = """
SELECT *
FROM wc2wca_col_log
WHERE
msg_details LIKE %s
AND msg_code = %s
AND biz_date = %s
AND msg_type = %s
ORDER BY msg_time desc
"""
# 定义查询参数
for i in Meetingloc.keys():
like_pattern = '%"meetingID":{"meetingLoc":' + str(Meetingloc.get(i)) + ',"meetingDayCode":[1-9]},"raceNo":1%'
# like_pattern = '%"meetingID":{"meetingLoc":8,"meetingDayCode":[1-9]},"raceNo":1%'
params = (
like_pattern, # LIKE参数
'1120', # msg_code
date, # biz_date
'1' # msg_type
)
# 创建游标执行查询
with conn.cursor(as_dict=True) as cursor:
cursor.execute(sql,params)
for row in cursor:
try:
# 解析JSON数据
json_data = json.loads(row['msg_details'])
# 提取目标字段
race_no = json_data['Data']['raceNo']
pool_bitmap = json_data['Data']['poolBitmap']
msg_time = row['msg_time']
# 转换投注类型
converted_pool = convert_pool_bitmap(pool_bitmap)
# 存储到数据集
output_data.append({
'meeting':i,
'time': msg_time.strftime('%Y-%m-%d %H:%M:%S.%f'), # 格式化时间
'interval': '',
'raceNo': race_no,
'pool': ', '.join(converted_pool) # 列表转字符串
})
except (KeyError, json.JSONDecodeError) as e:
print(f"数据处理异常: {str(e)}")
continue
# 数据处理: 将 S1 S2 S3 S4 S5 S6 ST HV 数据分组
if output_data:
groups = {}
for item in output_data:
meeting = item.get('meeting')
if meeting in Meetingloc.key():
if meeting not in groups:
groups[meeting] = []
groups[meeting].append(item)
# 自定义输出路径
output_dir = "./reports" # 自定义输出路径
os.makedirs(output_dir, exist_ok=True)
filename = f"RaceData_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" # 生成文件名(带时间戳)
filename = os.path.join(output_dir, filename)
# 创建Excel文件
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
for meeting_name, records in groups.items():
df = pd.DataFrame(records, columns=['meeting','time','interval', 'raceNo', 'pool']) #指定顺序 df = df[['meeting','time','interval', 'RaceNo', 'Pool']]
df["time"] = pd.to_datetime(df["time"]) # 转换为时间类型
# df = df.sort_values(by="time").reset_index(drop=True)
# 计算时间差(秒)
df["Next_Time"] = df["time"].shift(-1)
df["interval"] = (df["Next_Time"] - df["time"]).dt.total_seconds()
df.drop("Next_Time", axis=1, inplace=True)
# 处理最后一条记录的NaN值
df["interval"] = df["interval"].fillna(0)
df.to_excel(
writer,
sheet_name=meeting_name+str(date),
index=False,
startrow=0,
startcol=0)
# 获取工作表对象writer
# worksheet = writer.sheets.get(meeting_name)
worksheet = writer.sheets[meeting_name+str(date)]
# 设置列宽(单位:字符)
worksheet.column_dimensions['A'].width = 10 # MEETING
worksheet.column_dimensions['B'].width = 30 # 时间列
worksheet.column_dimensions['C'].width = 10 # 间隔
worksheet.column_dimensions['D'].width = 10 # RaceNo列
worksheet.column_dimensions['E'].width = 60 # Pool类型列
# 设置标题行样式
header_style = {
'font': {'bold': True, 'color': 'FFFFFF'},
'fill': {'fill_type': 'solid', 'start_color': '4F81BD'},
'alignment': {'horizontal': 'center'}
}
for cell in worksheet[1]: # 第一行为标题行
cell.font = openpyxl.styles.Font(**header_style['font'])
cell.fill = openpyxl.styles.PatternFill(**header_style['fill'])
cell.alignment = openpyxl.styles.Alignment(**header_style['alignment'])
# 自动调整行高
for row in worksheet.iter_rows():
for cell in row:
worksheet.row_dimensions[cell.row].height = 20
print(f"数据已成功导出到 {filename}")
except pymssql.Error as e:
print(f"数据库错误: {str(e)}")
finally:
if conn:
conn.close()
def process_date(date_str):
"""
处理日期参数的函数
:param date_str: 必传参数(格式示例:20240325)
"""
# 验证参数格式(可选)
if len(date_str) != 8 or not date_str.isdigit():
raise ValueError("日期格式应为8位数字,例如:20240325")
# 示例处理结果
year = date_str[:4]
month = date_str[4:6]
day = date_str[6:]
print(f"解析结果:{year}年{month}月{day}日")
# 这里添加你的业务逻辑
print(f"成功接收日期参数:{date_str}")
print("正在处理...")
# 这里添加你的业务逻辑
mains(date_str)
def main(): # [!] 关键点:确保这里没有参数
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description='日期处理程序')
parser.add_argument('date',
type=str,
help='必传的日期参数(8位数字,示例:20240325)')
args = parser.parse_args()
process_date(args.date)
if __name__ == "__main__":
process_date('20240328')
最后运行:
一、cmd直接运行脚本测试:
python date_app.py 20240325
二、打包:打包问题:‘“indexerror: tuple index out of range” 可以参考连接 , 一般问题可以参考连接
pyinstaller --onefile date_app.py
三、生成dist文件后,会生成exe文件
在dist文件夹里面新建reports文件夹:报告文件夹
四、测试打包后的程序
1、新建start.bat空文件
2、放入代码:
start PMUCOL_LOG.exe 20240328
3、运行start.bat
五、生成报告:.\reports\xxxx.xlsx
备注:至于生成Excel文件的代码,想看数据格式的。可以参考