当前位置: 首页 > article >正文

一个py文件搞定mysql查询+Json转换+表数据提取+根据数据条件生成excel文件+打包运行一条龙



import os
import argparse
import pymssql
import json
import pandas as pd
from datetime import datetime
from pandas.io.formats.excel import ExcelFormatter
import openpyxl

# 投注类型映射字典
BET_MAPPING = {
    1: 'WIN', 2: 'PLA', 3: 'QIN', 4: 'QPL',
    5: 'DBL', 6: 'TCE', 7: 'QTT', 8: 'D-Q',
    9: 'TBL', 10: 'T-T', 11: '6UP', 12: 'D-T',
    13: 'TRI', 14: 'FCT', 17: 'F-F'
}


Meetingloc = {'ST':1, 'HV':2, 'S1':5, 'S2':6,
    'S3':7, 'S4':8, 'S5':9}


def convert_pool_bitmap(pool_list):
    """将数字列表转换为投注类型"""
    return [BET_MAPPING.get(num, f'未知({num})') for num in pool_list]

def convert_meeting_loc(venue_list):
    """将数字列表转换为 meeting 类型"""
    return Meetingloc.get(venue_list)


def mains(date):
    # 数据库配置(需要根据实际情况修改)
    server = 'your server'
    user = 'your user'
    password = 'your pwd'
    database = 'your DB name'
    port = 'your port'

    # 准备数据容器
    output_data = []

    try:
        # 建立数据库连接
        conn = pymssql.connect(
            server=server,
            user=user,
            password=password,
            database=database,
            port=port
        )
        sql = """
                SELECT * 
                FROM wc2wca_col_log  
                WHERE 
                    msg_details LIKE %s 
                    AND msg_code = %s 
                    AND biz_date = %s 
                    AND msg_type = %s
                    ORDER BY msg_time desc
                """
        # 定义查询参数

        for i in Meetingloc.keys():
            like_pattern = '%"meetingID":{"meetingLoc":' + str(Meetingloc.get(i)) + ',"meetingDayCode":[1-9]},"raceNo":1%'
        # like_pattern = '%"meetingID":{"meetingLoc":8,"meetingDayCode":[1-9]},"raceNo":1%'
            params = (
                like_pattern,  # LIKE参数
                '1120',  # msg_code
                date,  # biz_date
                '1'  # msg_type
            )

        # 创建游标执行查询
            with conn.cursor(as_dict=True) as cursor:
                cursor.execute(sql,params)

                for row in cursor:
                    try:
                        # 解析JSON数据
                        json_data = json.loads(row['msg_details'])

                        # 提取目标字段
                        race_no = json_data['Data']['raceNo']
                        pool_bitmap = json_data['Data']['poolBitmap']
                        msg_time = row['msg_time']

                        # 转换投注类型
                        converted_pool = convert_pool_bitmap(pool_bitmap)

                        # 存储到数据集
                        output_data.append({
                            'meeting':i,
                            'time': msg_time.strftime('%Y-%m-%d %H:%M:%S.%f'),  # 格式化时间
                            'interval': '',
                            'raceNo': race_no,
                            'pool': ', '.join(converted_pool)  # 列表转字符串
                        })

                    except (KeyError, json.JSONDecodeError) as e:
                        print(f"数据处理异常: {str(e)}")
                        continue
        # 数据处理: 将 S1 S2 S3 S4 S5 S6 ST HV 数据分组
        if output_data:
            groups = {}
            for item in output_data:
                meeting = item.get('meeting')
                if meeting in Meetingloc.key():
                    if meeting not in groups:
                        groups[meeting] = []
                    groups[meeting].append(item)

            # 自定义输出路径
            output_dir = "./reports"  # 自定义输出路径
            os.makedirs(output_dir, exist_ok=True)
            filename = f"RaceData_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"  # 生成文件名(带时间戳)
            filename = os.path.join(output_dir, filename)

            # 创建Excel文件
            with pd.ExcelWriter(filename, engine='openpyxl') as writer:
                for meeting_name, records in groups.items():
                    df = pd.DataFrame(records, columns=['meeting','time','interval', 'raceNo', 'pool'])   #指定顺序  df = df[['meeting','time','interval', 'RaceNo', 'Pool']]
                    df["time"] = pd.to_datetime(df["time"])  # 转换为时间类型
                    # df = df.sort_values(by="time").reset_index(drop=True)

                    # 计算时间差(秒)
                    df["Next_Time"] = df["time"].shift(-1)
                    df["interval"] = (df["Next_Time"] - df["time"]).dt.total_seconds()
                    df.drop("Next_Time", axis=1, inplace=True)

                    # 处理最后一条记录的NaN值
                    df["interval"] = df["interval"].fillna(0)

                    df.to_excel(
                        writer,
                        sheet_name=meeting_name+str(date),
                        index=False,
                        startrow=0,
                        startcol=0)

                    # 获取工作表对象writer
                    # worksheet = writer.sheets.get(meeting_name)
                    worksheet = writer.sheets[meeting_name+str(date)]


                    # 设置列宽(单位:字符)
                    worksheet.column_dimensions['A'].width = 10  # MEETING
                    worksheet.column_dimensions['B'].width = 30  # 时间列
                    worksheet.column_dimensions['C'].width = 10  # 间隔
                    worksheet.column_dimensions['D'].width = 10  # RaceNo列
                    worksheet.column_dimensions['E'].width = 60  # Pool类型列

                    # 设置标题行样式
                    header_style = {
                        'font': {'bold': True, 'color': 'FFFFFF'},
                        'fill': {'fill_type': 'solid', 'start_color': '4F81BD'},
                        'alignment': {'horizontal': 'center'}
                    }
                    for cell in worksheet[1]:  # 第一行为标题行
                        cell.font = openpyxl.styles.Font(**header_style['font'])
                        cell.fill = openpyxl.styles.PatternFill(**header_style['fill'])
                        cell.alignment = openpyxl.styles.Alignment(**header_style['alignment'])

                    # 自动调整行高
                    for row in worksheet.iter_rows():
                        for cell in row:
                            worksheet.row_dimensions[cell.row].height = 20

            print(f"数据已成功导出到 {filename}")

    except pymssql.Error as e:
        print(f"数据库错误: {str(e)}")
    finally:
        if conn:
            conn.close()


def process_date(date_str):
    """
    处理日期参数的函数
    :param date_str: 必传参数(格式示例:20240325)
    """
    # 验证参数格式(可选)
    if len(date_str) != 8 or not date_str.isdigit():
        raise ValueError("日期格式应为8位数字,例如:20240325")

    # 示例处理结果
    year = date_str[:4]
    month = date_str[4:6]
    day = date_str[6:]
    print(f"解析结果:{year}{month}{day}日")

    # 这里添加你的业务逻辑
    print(f"成功接收日期参数:{date_str}")
    print("正在处理...")

    # 这里添加你的业务逻辑
    mains(date_str)

def main(): # [!] 关键点:确保这里没有参数
    # 创建命令行参数解析器
    parser = argparse.ArgumentParser(description='日期处理程序')
    parser.add_argument('date',
                       type=str,
                       help='必传的日期参数(8位数字,示例:20240325)')
    args = parser.parse_args()
    process_date(args.date)


if __name__ == "__main__":
    process_date('20240328')

最后运行:

一、cmd直接运行脚本测试:

python date_app.py 20240325

二、打包:打包问题:‘“indexerror: tuple index out of range” 可以参考连接 , 一般问题可以参考连接

pyinstaller --onefile date_app.py

三、生成dist文件后,会生成exe文件
在dist文件夹里面新建reports文件夹:报告文件夹

四、测试打包后的程序
1、新建start.bat空文件
2、放入代码:

start PMUCOL_LOG.exe 20240328

3、运行start.bat

五、生成报告:.\reports\xxxx.xlsx

备注:至于生成Excel文件的代码,想看数据格式的。可以参考


http://www.kler.cn/a/569434.html

相关文章:

  • Spring MVC框架六:Ajax技术
  • Redis面试常见问题——使用场景问题
  • React Portals深度解析:突破组件层级的渲染艺术
  • spring boot打包插件的问题
  • 【Mac】git使用再学习
  • Django应用的高级配置和管理
  • 【Python 数据结构 3.顺序表】
  • python | 2 个删除列表中空字符串元素的方法
  • 【GenBI优化】提升text2sql准确率:建议使用推理大模型,增加重试
  • The First项目报告:VANA如何重塑数据所有权与AI训练
  • Linux上用C++和GCC开发程序实现两个不同PostgreSQL实例下单个数据库中多个Schema稳定高效的数据迁移到其它PostgreSQL实例
  • Redis100道高频面试题
  • python-leetcode 46.从前序与中序遍历序列构造二叉树
  • 西电应用密码学与网络安全实验通关指南
  • Git与GitHub:它们是什么,有什么区别与联系?
  • coze生成的工作流,发布后,利用cmd命令行执行。可以定时发日报,周报等。让他总结你飞书里面的表格。都可以
  • C++对象特性
  • 1.Qt写简单的登录界面(c++)
  • Java---入门基础篇(下)---方法与数组
  • Python----Python爬虫(多线程,多进程,协程爬虫)