当前位置: 首页 > article >正文

多线程显示 CSV 2 PNG 倒计时循环播放

import argparse
import logging
import sys
from pathlib import Path
import time
import os
import pandas as pd
import matplotlib.pyplot as plt
from concurrent.futures import ProcessPoolExecutor, as_completed

# 初始化日志记录
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s',
                    handlers=[logging.FileHandler('script.log'), logging.StreamHandler(sys.stdout)])

def process_file(file_path, input_directory, output_directory):
    try:
        # 使用 Pandas 读取 CSV 文件
        data1 = pd.read_csv(file_path)
    except Exception as e:
        logging.error(f"读取文件 {file_path} 时发生错误: {e}")
        return

    # 获取数据的行数和列数
    nRow, nCol = data1.shape

    # 绘制图表
    plt.figure(figsize=(10, 8))
    index = 0
    for i in range(nCol):
        if data1.columns[i] == "datetime":
            continue

        plt.subplot(nCol - 1, 1, index + 1)
        index += 1
        plt.plot(data1.iloc[:, i])
        plt.title(data1.columns[i])

    # 获取当前时间并格式化
    formatted_time = time.strftime('%Y_%m_%d_%H_%M', time.localtime())

    # 修改文件名以包含当前时间,并替换特殊字符
    file_stem = file_path.stem.replace('@', '_').replace(':', '_')
    file_name = formatted_time + '_' + file_stem + '.png'

    # 生成输出路径
    output_path = Path(output_directory) / file_name

    # 保存图像到 Fig 目录
    try:
        plt.savefig(str(output_path))
        logging.info(f"存储图片中... {output_path}")
        plt.close()
    except Exception as e:
        logging.error(f"保存图像 {output_path} 时发生错误: {e}")
        return
    return output_path

def animate_charts(output_directory):
    # 获取最新生成的两个图表文件
    paths = sorted(Path(output_directory).glob('*.png'), key=lambda p: p.stat().st_mtime, reverse=True)
    latest_paths = paths[:2]

    # 设置初始状态
    paused = False

    def on_key_press(event):
        nonlocal paused
        if event.key == ' ':  # 空格键
            paused = not paused
            if paused:
                print("Paused. Press space to continue.")
            else:
                print("Continuing...")

    # 主循环
    while True:
        for path in latest_paths:
            # 显示文件名
            print(f"正在显示文件: {path}")

            if paused:
                plt.connect('key_press_event', on_key_press)
                img = plt.imread(str(path))
                fig, ax = plt.subplots()
                ax.imshow(img)
                ax.axis('off')  # 关闭坐标轴

                creation_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(path.stat().st_ctime))
                ax.set_title(f'File Created at: {creation_time}')

                plt.show(block=False)

                # 显示倒计时
                duration = 55
                text_obj = None
                for remaining in range(duration, 0, -1):
                    if not paused:
                        break
                    time.sleep(1)
                    if text_obj is not None:
                        text_obj.remove()  # 直接移除文本对象
                    text_obj = ax.text(0.5, 0.15, f"Timing: {remaining}s", transform=ax.transAxes, ha='center', va='center', color='red', fontsize=8)
                    fig.canvas.draw_idle()
                    plt.pause(0.001)  # 更新图形界面

                plt.close()

                while paused:
                    time.sleep(0.1)  # 等待恢复
            else:
                img = plt.imread(str(path))
                fig, ax = plt.subplots()
                ax.imshow(img)
                ax.axis('off')  # 关闭坐标轴

                creation_time = time.strftime("%Y-%m-%d %H:%M", time.localtime(path.stat().st_ctime))
                ax.set_title(f'File Created at: {creation_time}')

                plt.show(block=False)

                # 显示倒计时
                duration = 60
                text_obj = None
                for remaining in range(duration, 0, -1):
                    time.sleep(1)
                    if text_obj is not None:
                        text_obj.remove()  # 直接移除文本对象
                    text_obj = ax.text(0.5, 0.15, f"Timing: {remaining}s", transform=ax.transAxes, ha='center', va='center', color='red', fontsize=8)
                    fig.canvas.draw_idle()
                    plt.pause(0.001)  # 更新图形界面

                plt.close()

def main(input_directory, output_directory):
    # 开始运行提示
    logging.info("-----开始运行-----")

    # 定义传感器数据所在的目录
    sensor_dir = Path(input_directory)

    # 获取传感器目录下的所有文件列表
    files = list(sensor_dir.glob('*.csv'))

    # 输出文件列表
    logging.info("即将进行文件读取...")
    for i, f in enumerate(files, start=1):
        logging.info(f"文件读取中 {i}/{len(files)}")
        logging.info(f.name)

    # 定义输出图像的目录
    fig_dir = Path(output_directory)

    # 检查是否存在 Fig 目录,如果不存在则创建
    if not fig_dir.exists():
        try:
            fig_dir.mkdir(parents=True, exist_ok=True)
        except FileExistsError:
            logging.info("目录已存在。")

    # 打印当前时间
    current_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
    logging.info(f"读取完毕,读取时间: {current_time}  ^_^ 存储图片进行中... ")

    # 使用并行处理
    with ProcessPoolExecutor() as executor:
        futures = []
        for file_path in files:
            future = executor.submit(process_file, file_path, input_directory, output_directory)
            futures.append(future)

        # 等待所有任务完成
        for future in as_completed(futures):
            future.result()

    # 结束提示
    logging.info("存储完毕")

    # 显示最近两张图片
    animate_charts(output_directory)

if __name__ == "__main__":
    # 定义默认配置值
    input_directory = 'C:\\Users\\Administrator\\Desktop\\Sensor'
    output_directory = 'C:\\Users\\Administrator\\Desktop\\Fig'

    # 解析命令行参数
    parser = argparse.ArgumentParser(description="Process CSV files and generate charts.")
    parser.add_argument('--input_directory', type=str, default=input_directory, help='Input directory containing CSV files.')
    parser.add_argument('--output_directory', type=str, default=output_directory, help='Output directory to save generated charts.')
    args = parser.parse_args()

    # 主函数调用
    main(args.input_directory, args.output_directory)


    


http://www.kler.cn/a/372308.html

相关文章:

  • RocketMQ面试题:进阶部分
  • 自组织映射 (Self-Organizing Map, SOM) 算法详解与PyTorch实现
  • 数据挖掘——聚类
  • C# 服务应用研究
  • STLG_01_05_程序设计C语言 - 数据类型概念解析
  • springboot实战(19)(条件分页查询、PageHelper、MYBATIS动态SQL、mapper映射配置文件、自定义类封装分页查询数据集)
  • 低功耗模组学习指南:从入门到精通通过MQTT连接实现远程控制
  • 如何在不同设备上轻松下载Facebook应用:全面指南
  • AI助力医疗数据自动化:诊断报告识别与管理
  • TCP全连接队列与 tcpdump 抓包
  • vue点击菜单,出现2个相同tab,啥原因
  • 代码备份管理 —— Git实用操作
  • Spring Boot框架下的酒店住宿登记系统
  • Centos如何卸载docker
  • WPF中视觉树和逻辑树的区别和联系
  • HTML入门教程2:HTML发展历史
  • 分布式数据库技术金融应用规范技术架构
  • java 中 List<T> 类型数据在 postgreSql 数据库中存储
  • 有效沟通与系统思考
  • 数据结构和算法-动态规划(3)-经典问题
  • Springboot整合RocketMQ分布式事务
  • 博科交换机SNMP采集(光衰)信息
  • 【Hive复杂数据类型和函数】全网总结最全的Hive函数
  • Next.js、Prisma 和 MySQL 实践示例
  • js获取浏览器指纹
  • rabbitmq高级特性(1):消息确认,持久性,发送方确认和重试机制