Python 操作数据库:读取 Clickhouse 数据存入csv文件
import pandas as pd
from clickhouse_driver import Client
import timeit
import logging
import threading
from threading import Lock
from queue import Queue
from typing import List, Dict, Set
from contextlib import contextmanager
import os
import time
# 配置参数
CONFIG = {
'DB': {
'host': 'xxx',
'database': 'xxx',
'user': 'xxxx',
'password': 'xxxx'
},
'BATCH_SIZE': 5000,
'TOTAL_RECORDS': 1000000,
'NUM_THREADS': 5,
'OUTPUT_FILE': 'yyxs_ck2excel_v4.csv',
'MAX_RETRIES': 3, # 最大重试次数
'RETRY_DELAY': 5, # 重试延迟(秒)
'CONNECTION_TIMEOUT': 60 # 连接超时时间(秒)
}
# 设置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s.%(msecs)d - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
class DatabaseManager:
_thread_local = threading.local()
@classmethod
@contextmanager
def get_connection(cls):
"""线程安全的数据库连接管理器"""
retry_count = 0
while retry_count < CONFIG['MAX_RETRIES']:
try:
if not hasattr(cls._thread_local, "client"):
cls._thread_local.client = Client(
**CONFIG['DB'],
connect_timeout=CONFIG['CONNECTION_TIMEOUT']
)
logger.info(f"Created new database connection for thread {threading.current_thread().name}")
yield cls._thread_local.client
break
except Exception as e:
retry_count += 1
logger.error(f"Database connection error (attempt {retry_count}): {str(e)}")
if hasattr(cls._thread_local, "client"):
cls._thread_local.client.disconnect()
delattr(cls._thread_local, "client")
if retry_count < CONFIG['MAX_RETRIES']:
time.sleep(CONFIG['RETRY_DELAY'])
else:
raise
@classmethod
def close_all_connections(cls):
"""关闭当前线程的数据库连接"""
if hasattr(cls._thread_local, "client"):
cls._thread_local.client.disconnect()
delattr(cls._thread_local, "client")
logger.info(f"Closed database connection for thread {threading.current_thread().name}")
class DataProcessor:
def __init__(self):
self.columns = [
"a", "b", "c", "d"
]
self.query = '''
SELECT
a,b,c,d
FROM
table_name
ORDER BY
a,b,c,d
'''
self.file_lock = Lock() # 添加文件写入锁
self.total_rows = 0 # 添加行数统计
self.processed_batches = set() # 记录已成功处理的批次
self.failed_batches = set() # 记录失败的批次
def fetch_data_batch(self, batch_size: int, start: int) -> List[tuple]:
"""获取一批数据,带重试机制"""
retry_count = 0
while retry_count < CONFIG['MAX_RETRIES']:
try:
with DatabaseManager.get_connection() as client:
query_with_limit = f"{self.query} LIMIT {batch_size} OFFSET {start}"
result = client.execute(query_with_limit)
logger.info(f"Fetched {len(result)} records starting from {start}.")
return result
except Exception as e:
retry_count += 1
logger.error(f"Error fetching batch starting at {start} (attempt {retry_count}): {str(e)}")
if retry_count < CONFIG['MAX_RETRIES']:
time.sleep(CONFIG['RETRY_DELAY'])
else:
raise
def save_to_csv(self, df: pd.DataFrame, file_name: str, batch_start: int):
"""保存数据到CSV文件"""
try:
with self.file_lock: # 使用锁保护文件写入
file_exists = os.path.exists(file_name) and os.path.getsize(file_name) > 0
df.to_csv(
file_name,
mode='a',
header= not file_exists,
index=False
)
self.total_rows += len(df)
self.processed_batches.add(batch_start)
logger.info(f"Appended {len(df)} records to {file_name}. Total rows: {self.total_rows}")
except Exception as e:
logger.error(f"Error saving to CSV: {str(e)}")
raise
def process_batch(self, start: int, batch_size: int, output_file: str):
"""处理单个批次的数据"""
try:
if start in self.processed_batches:
logger.info(f"Batch {start} already processed, skipping.")
return True
result_batch = self.fetch_data_batch(batch_size, start)
df_batch = pd.DataFrame(result_batch, columns=self.columns)
self.save_to_csv(df_batch, output_file, start)
return True
except Exception as e:
logger.error(f"Error processing batch starting at {start}: {str(e)}")
self.failed_batches.add(start)
return False
def main_v1():
try:
processor = DataProcessor()
output_file = CONFIG['OUTPUT_FILE']
# 清空或创建输出文件
with open(output_file, 'w', encoding='utf-8') as f:
pass
queue = Queue()
retry_queue = Queue() # 用于重试失败的批次
threads = []
def worker():
while True:
try:
start = queue.get()
if start is None:
break
success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)
if not success:
retry_queue.put(start)
queue.task_done()
except Exception as e:
logger.error(f"Worker thread error: {str(e)}")
finally:
queue.task_done()
# 启动工作线程
for _ in range(CONFIG['NUM_THREADS']):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
threads.append(t)
# 添加任务到队列
for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):
queue.put(start)
# 等待主要处理完成
queue.join()
# 处理失败的批次
while not retry_queue.empty():
start = retry_queue.get()
logger.info(f"Retrying failed batch starting at {start}")
if processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file):
logger.info(f"Successfully retried batch {start}")
else:
logger.error(f"Failed to process batch {start} after retries")
# 停止所有线程
for _ in threads:
queue.put(None)
for t in threads:
t.join()
# 最终验证
logger.info(f"Processing completed. Total rows: {processor.total_rows}")
logger.info(f"Processed batches: {len(processor.processed_batches)}")
logger.info(f"Failed batches: {len(processor.failed_batches)}")
if processor.failed_batches:
logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")
except Exception as e:
logger.error(f"Main process error: {str(e)}")
raise
finally:
DatabaseManager.close_all_connections()
def main():
try:
processor = DataProcessor()
output_file = CONFIG['OUTPUT_FILE']
# 清空或创建输出文件
with open(output_file, 'w', encoding='utf-8') as f:
pass
queue = Queue()
retry_queue = Queue()
threads = []
def worker():
while True:
try:
start = queue.get()
if start is None: # 退出信号
queue.task_done()
break
try:
success = processor.process_batch(start, CONFIG['BATCH_SIZE'], output_file)
if not success:
retry_queue.put(start)
except Exception as e:
logger.error(f"Error processing batch at offset {start}: {str(e)}")
retry_queue.put(start)
finally:
queue.task_done() # 只在这里调用一次
except Exception as e:
logger.error(f"Worker thread error: {str(e)}")
# 不要在这里调用 queue.task_done()
# 启动工作线程
for _ in range(CONFIG['NUM_THREADS']):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
threads.append(t)
# 添加任务到队列
total_batches = (CONFIG['TOTAL_RECORDS'] + CONFIG['BATCH_SIZE'] - 1) // CONFIG['BATCH_SIZE']
for start in range(0, CONFIG['TOTAL_RECORDS'], CONFIG['BATCH_SIZE']):
queue.put(start)
# 等待主要处理完成
queue.join()
# 处理失败的批次
retry_count = 0
max_retries = 3
while not retry_queue.empty() and retry_count < max_retries:
retry_count += 1
retry_size = retry_queue.qsize()
logger.info(f"Retrying {retry_size} failed batches (attempt {retry_count})")
# 将失败的批次重新放入主队列
for _ in range(retry_size):
start = retry_queue.get()
queue.put(start)
# 等待重试完成
queue.join()
# 停止所有线程
for _ in threads:
queue.put(None)
for t in threads:
t.join()
# 最终验证
logger.info(f"Processing completed. Total rows: {processor.total_rows}")
logger.info(f"Expected batches: {total_batches}")
logger.info(f"Processed batches: {len(processor.processed_batches)}")
logger.info(f"Failed batches: {len(processor.failed_batches)}")
if processor.failed_batches:
logger.warning(f"Failed batches: {sorted(processor.failed_batches)}")
# 验证数据完整性
try:
df_final = pd.read_csv(output_file)
actual_rows = len(df_final)
logger.info(f"Final CSV file contains {actual_rows} rows")
if actual_rows != processor.total_rows:
logger.warning(f"Row count mismatch: CSV has {actual_rows} rows, but processed {processor.total_rows} rows")
# 检查是否有重复的表头
duplicate_headers = df_final[df_final.iloc[:, 0] == df_final.columns[0]]
if not duplicate_headers.empty:
logger.warning(f"Found {len(duplicate_headers)} duplicate headers at rows: {duplicate_headers.index.tolist()}")
# 清理重复表头
df_final = df_final[df_final.iloc[:, 0] != df_final.columns[0]]
df_final.to_csv(output_file, index=False)
logger.info(f"Cleaned CSV file now contains {len(df_final)} rows")
except Exception as e:
logger.error(f"Error validating final CSV file: {str(e)}")
except Exception as e:
logger.error(f"Main process error: {str(e)}")
raise
finally:
DatabaseManager.close_all_connections()
if __name__ == "__main__":
start_time = timeit.default_timer()
try:
main()
elapsed_time = timeit.default_timer() - start_time
logger.info(f"数据提取和存储完成,耗时: {elapsed_time:.2f} 秒")
except Exception as e:
logger.error(f"程序执行失败: {str(e)}")
raise
主要类
- DatabaseManager
管理数据库连接的线程安全类
使用 threading.local() 确保每个线程有自己的连接
包含重试机制和连接管理功能
- DataProcessor
处理数据的核心类
定义了数据列和查询语句
处理数据批次的获取和保存
跟踪处理状态和失败的批次
2. 工作流程
- 初始化
创建空的输出文件
初始化线程池和任务队列
- 数据处理
将总数据量分成多个批次
多个工作线程并行处理数据批次
每个批次:
- 从数据库获取数据
- 转换为 DataFrame
- 保存到 CSV 文件
- 错误处理
失败的批次会进入重试队列
最多重试 3 次
记录所有失败的批次
- 数据验证
检查最终 CSV 文件的行数
检查和清理重复的表头
验证数据完整性
3. 特点
- 线程安全
使用线程本地存储管理数据库连接
文件写入使用锁保护
- 容错机制
数据库连接重试
批次处理重试
详细的日志记录
- 性能优化
批量处理数据
多线程并行处理
使用队列管理任务
- 监控和日志
详细的日志记录
处理进度跟踪
执行时间统计
这个程序适合处理大量数据的导出任务,具有良好的容错性和可靠性。