当前位置: 首页 > article >正文

Python 操作数据库:读取 Clickhouse 数据存入csv文件

import pandas as pd
from clickhouse_driver import Client
import timeit
import logging
import threading
from threading import Lock
from queue import Queue
from typing import List, Dict, Set
from contextlib import contextmanager
import os
import time

# 配置参数
CONFIG = {
    'DB': {
        'host': 'xxx',
        'database': 'xxx',
        'user': 'xxxx',
        'password': 'xxxx'
    },
    'BATCH_SIZE': 5000,
    'TOTAL_RECORDS': 1000000,
    'NUM_THREADS': 5,
    'OUTPUT_FILE': 'yyxs_ck2excel_v4.csv',
    'MAX_RETRIES': 3,           # 最大重试次数
    'RETRY_DELAY': 5,           # 重试延迟(秒)
    'CONNECTION_TIMEOUT': 60    # 连接超时时间(秒)
}

# 设置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s.%(msecs)d - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)

class DatabaseManager:
    _thread_local = threading.local()

    @classmethod
    @contextmanager
    def get_connection(cls):
        """线程安全的数据库连接管理器"""
        retry_count = 0
        while retry_count < CONFIG['MAX_RETRIES']:
            try:    
                if not hasattr(cls._thread_local, "client"):
                    cls._thread_local.client = Client(
                        **CONFIG['DB'],
                        connect_timeout=CONFIG['CONNECTION_TIMEOUT']
                    )
                    logger.info(f"Created new database connection for thread {threading.current_thread().name}")
                yield cls._thread_local.client
                break
            except Exception as e:
                retry_count += 1
                logger.error(f"Database connection error (attempt {retry_count}): {str(e)}")
                if hasattr(cls._thread_local, "client"):
                    cls._thread_local.client.disconnect()
                    delattr(cls._thread_local, "client")
                if retry_count < CONFIG['MAX_RETRIES']:
                    time.sleep(CONFIG['RETRY_DELAY'])
                else:
                    raise
    
    @classmethod
    def close_all_connections(cls):
        """关闭当前线程的数据库连接"""
        if hasattr(cls._thread_local, "client"):
            cls._thread_local.client.disconnect()
            delattr(cls._thread_local, "client")
            logger.info(f"Closed database connection for thread {threading.current_thread().name}")

class DataProcessor:
    def __init__(self):
        self.columns = [
            "a", "b", "c", "d"
        ]
        self.query = '''
            SELECT
               a,b,c,d
            FROM
                table_name
            ORDER BY
               a,b,c,d 
        '''
        self.file_lock = Lock()  # 添加文件写入锁
        self.total_rows = 0      # 添加行数统计
        self.processed_batches = set()  # 记录已成功处理的批次
        self.failed_batches = set()     # 记录失败的批次
    def fetch_data_batch(self, batch_size: int, start: int) -> List[tuple]:
        """获取一批数据,带重试机制"""
        retry_count = 0
        while retry_count < CONFIG['MAX_RETRIES']:
            try:
                with DatabaseManager.get_connection() as client:
                    query_with_limit = f"{self.query} LIMIT {batch_size} OFFSET {start}"
                    result = client.execute(query_with_limit)
                    logger.info(f"Fetched {len(result)} records starting from {start}.")
                    return result
            except Exception as e:
                retry_count += 1
                logger.error(f"Error fetching batch starting at {start} (attempt {retry_count}): {str(e)}")
                if retry_count < CONFIG['MAX_RETRIES']:
                    time.sleep(CONFIG['RETRY_DELAY'])
                else:
                    raise

    def save_to_csv(self, df: pd.DataFrame, file_name: str, batch_start: int):
        """保存数据到CSV文件"""
        try:
            with self.file_lock:  # 使用锁保护文件写入
                file_exists = os.path.exists(file_name) and os.path.getsize(file_name) > 0
                df.to_csv(
                    file_name, 
                    mode='a', 
                    header= not file_exists,
                    index=False
                )
                self.total_rows += len(df)
                self.processed_batches.add(batch_start)
                logger.info(f"Appended {len(df)} records to {file_name}. Total rows: {self.total_rows}")
        except Exception as e:
            logger.error(f"Error saving to CSV: {str(e)}")
            raise

    def process_batch(self, start: int, batch_size: int, output_file: str):
        """处理单个批次的数据"""
        try:
            if start in self.processed_batches:
                logger.info(f"Batch {start} already processed, skipping.")
                return True

            result_batch = self.fetch_data_batch(batch_size, start)
            df_batch = pd.DataFrame(result_batch, columns=self.columns)
            self.save_to_csv(df_batch, output_file, start)
            return True
        except Exception as e:
            logger.error(f"Error processing batch starting at {start}: {str(e)}")
            self.failed_batches.add(start)
            return False

def main_v1():
    try:
        processor = DataProcessor()
        output_file = CONFIG['OUTPUT_FILE']
        
        # 清空或创建输出文件
        with open(output_file, 'w', encoding='utf-8') as f:
            pass

        queue = Queue()
        retry_queue = Queue()  # 用于重试失败的批次
        threads = []

        def worker():
            while True:
                try:
                    start = queue.get()
                    if start is None:
                        break
                    
                    success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)
                    if not success:
                        retry_queue.put(start)
                    
                    queue.task_done()
                except Exception as e:
                    logger.error(f"Worker thread error: {str(e)}")
                finally:
                    queue.task_done()

        # 启动工作线程
        for _ in range(CONFIG['NUM_THREADS']):
            t = threading.Thread(target=worker)
            t.daemon = True
            t.start()
            threads.append(t)

        # 添加任务到队列
        for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):
            queue.put(start)

        # 等待主要处理完成
        queue.join()

        # 处理失败的批次
        while not retry_queue.empty():
            start = retry_queue.get()
            logger.info(f"Retrying failed batch starting at {start}")
            if processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file):
                logger.info(f"Successfully retried batch {start}")
            else:
                logger.error(f"Failed to process batch {start} after retries")

        # 停止所有线程
        for _ in threads:
            queue.put(None)
        for t in threads:
            t.join()

        # 最终验证
        logger.info(f"Processing completed. Total rows: {processor.total_rows}")
        logger.info(f"Processed batches: {len(processor.processed_batches)}")
        logger.info(f"Failed batches: {len(processor.failed_batches)}")
        
        if processor.failed_batches:
            logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")

    except Exception as e:
        logger.error(f"Main process error: {str(e)}")
        raise
    finally:
        DatabaseManager.close_all_connections()

def main():
    try:
        processor = DataProcessor()
        output_file = CONFIG['OUTPUT_FILE']
        
        # 清空或创建输出文件
        with open(output_file, 'w', encoding='utf-8') as f:
            pass

        queue = Queue()
        retry_queue = Queue()
        threads = []

        def worker():
            while True:
                try:
                    start = queue.get()
                    if start is None:  # 退出信号
                        queue.task_done()
                        break
                    
                    try:
                        success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)
                        if not success:
                            retry_queue.put(start)
                    except Exception as e:
                        logger.error(f"Error processing batch at offset {start}: {str(e)}")
                        retry_queue.put(start)
                    finally:
                        queue.task_done()  # 只在这里调用一次
                except Exception as e:
                    logger.error(f"Worker thread error: {str(e)}")
                    # 不要在这里调用 queue.task_done()

        # 启动工作线程
        for _ in range(CONFIG['NUM_THREADS']):
            t = threading.Thread(target=worker)
            t.daemon = True
            t.start()
            threads.append(t)

        # 添加任务到队列
        total_batches = (CONFIG['TOTAL_RECORDS'] + CONFIG['BATCH_SIZE'] - 1) // CONFIG['BATCH_SIZE']
        for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):
            queue.put(start)

        # 等待主要处理完成
        queue.join()

        # 处理失败的批次
        retry_count = 0
        max_retries = 3
        while not retry_queue.empty() and retry_count < max_retries:
            retry_count += 1
            retry_size = retry_queue.qsize()
            logger.info(f"Retrying {retry_size} failed batches (attempt {retry_count})")
            
            # 将失败的批次重新放入主队列
            for _ in range(retry_size):
                start = retry_queue.get()
                queue.put(start)
            
            # 等待重试完成
            queue.join()

        # 停止所有线程
        for _ in threads:
            queue.put(None)
        for t in threads:
            t.join()

        # 最终验证
        logger.info(f"Processing completed. Total rows: {processor.total_rows}")
        logger.info(f"Expected batches: {total_batches}")
        logger.info(f"Processed batches: {len(processor.processed_batches)}")
        logger.info(f"Failed batches: {len(processor.failed_batches)}")
        
        if processor.failed_batches:
            logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")

        # 验证数据完整性
        try:
            df_final = pd.read_csv(output_file)
            actual_rows = len(df_final)
            logger.info(f"Final CSV file contains {actual_rows} rows")
            
            if actual_rows != processor.total_rows:
                logger.warning(f"Row count mismatch: CSV has {actual_rows} rows, but processed {processor.total_rows} rows")
                
            # 检查是否有重复的表头
            duplicate_headers = df_final[df_final.iloc[:, 0] == df_final.columns[0]]
            if not duplicate_headers.empty:
                logger.warning(f"Found {len(duplicate_headers)} duplicate headers at rows: {duplicate_headers.index.tolist()}")
                # 清理重复表头
                df_final = df_final[df_final.iloc[:, 0] != df_final.columns[0]]
                df_final.to_csv(output_file, index=False)
                logger.info(f"Cleaned CSV file now contains {len(df_final)} rows")
                
        except Exception as e:
            logger.error(f"Error validating final CSV file: {str(e)}")

    except Exception as e:
        logger.error(f"Main process error: {str(e)}")
        raise
    finally:
        DatabaseManager.close_all_connections()

if __name__ == "__main__":
    start_time = timeit.default_timer()
    try:
        main()
        elapsed_time = timeit.default_timer() - start_time
        logger.info(f"数据提取和存储完成,耗时: {elapsed_time:.2f} 秒")
    except Exception as e:
        logger.error(f"程序执行失败: {str(e)}")
        raise
主要类
  • DatabaseManager

管理数据库连接的线程安全类

使用 threading.local() 确保每个线程有自己的连接

包含重试机制和连接管理功能

  • DataProcessor

处理数据的核心类

定义了数据列和查询语句

处理数据批次的获取和保存

跟踪处理状态和失败的批次

2. 工作流程

  • 初始化

创建空的输出文件

初始化线程池和任务队列

  • 数据处理

将总数据量分成多个批次

多个工作线程并行处理数据批次

每个批次:

  • 从数据库获取数据
  • 转换为 DataFrame
  • 保存到 CSV 文件
  • 错误处理

失败的批次会进入重试队列

最多重试 3 次

记录所有失败的批次

  • 数据验证

检查最终 CSV 文件的行数

检查和清理重复的表头

验证数据完整性

3. 特点

  • 线程安全

使用线程本地存储管理数据库连接

文件写入使用锁保护

  • 容错机制

数据库连接重试

批次处理重试

详细的日志记录

  • 性能优化

批量处理数据

多线程并行处理

使用队列管理任务

  • 监控和日志

详细的日志记录

处理进度跟踪

执行时间统计

这个程序适合处理大量数据的导出任务,具有良好的容错性和可靠性。


http://www.kler.cn/a/383285.html

相关文章:

  • adb无法连接到安卓设备【解决方案】报错:adb server version (40) doesn‘t match this client (41);
  • x86_64 Ubuntu 编译安装英伟达GPU版本的OpenCV
  • 【NLP高频面题 - 高效微调篇】什么是提示微调?
  • 【Linux】Linux开发利器:make与Makefile自动化构建详解
  • HW护网分析研判思路,流量告警分析技巧
  • leetcode-80.删除有序数组的重复项II-day12
  • Java之字符串分割转换List
  • faiss用于大数据量的向量检索
  • vm虚拟机中添加网卡却在network-scripts文件找不到,解决方法
  • vue中的nextTick() - 2024最新版前端秋招面试短期突击面试题【100道】
  • IDEA2024下安装kubernetes插件并配置进行使用
  • Spring源码(十一):Spring MVC之DispatchServlet
  • WPF+MVVM案例实战(二十)- 制作一个雷达辐射效果的按钮
  • Ubuntu 安装Nvidia 显卡驱动
  • 新能源汽车空调压缩机:科技驱动的冷暖核心
  • 深度学习:循环神经网络(RNN)详解
  • 深度学习:Cross-attention详解
  • SpringMvc day1101
  • 基于布局的3D场景生成技术:SceneCraft
  • 美创科技以韧性数据安全防护体系助力畜牧业数字化发展
  • 计算机专业开题报告写法,该怎么写好?
  • 头歌——机器学习(线性回归)
  • NewStarCTF2024-Week5-WebMisc-WP
  • yolov8涨点系列之轻量化主干网络替换
  • Android中的跨进程通信方案总结一-AIDL运行原理
  • 机器学习—构建一个神经网络