Python带多组标签的Snowflake SQL查询批量数据导出程序
设计一个基于多个带标签Snowflake SQL语句作为json配置文件的Python代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。使用Python Snowflake Connector,需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。
import argparse
import csv
import json
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import snowflake.connector
from typing import Dict, List, Optional
# 日志配置
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("data_export.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class SnowflakeExporter:
def __init__(self, config: Dict, output_dir: str = "./output"):
self.snowflake_config = {
"user": os.getenv("SNOWFLAKE_USER"),
"password": os.getenv("SNOWFLAKE_PASSWORD"),
"account": os.getenv("SNOWFLAKE_ACCOUNT"),
"warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
"database": os.getenv("SNOWFLAKE_DATABASE"),
"schema": os.getenv("SNOWFLAKE_SCHEMA")
}
self.output_dir = output_dir
os.makedirs(self.output_dir, exist_ok=True)
def _get_file_path(self, label: str) -> str:
return os.path.join(self.output_dir, f"{label}.csv")
def _export_query(self, query_config: Dict):
label = query_config["label"]
sql = query_config["sql"]
params = query_config.get("params", {})
file_path = self._get_file_path(label)
start_time = datetime.now()
total_rows = 0
status = "SUCCESS"
error_msg = None
try:
conn = snowflake.connector.connect(**self.snowflake_config)
cursor = conn.cursor()
logger.info(f"Executing query for [{label}]")
cursor.execute(sql, params)
# 获取列信息
columns = [col[0] for col in cursor.description]
# 初始化文件写入
with open(file_path, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(columns)
# 分批处理数据
while True:
rows = cursor.fetchmany(10000)
if not rows:
break
writer.writerows(rows)
total_rows += len(rows)
except Exception as e:
status = "FAILED"
error_msg = str(e)
logger.error(f"Error exporting {label}: {error_msg}", exc_info=True)
finally:
cursor.close()
conn.close()
duration = (datetime.now() - start_time).total_seconds()
# 记录日志
log_entry = {
"timestamp": datetime.now().isoformat(),
"label": label,
"status": status,
"file_path": file_path,
"rows_exported": total_rows,
"duration_seconds": round(duration, 2),
"error_message": error_msg
}
logger.info(json.dumps(log_entry, indent=2))
def execute_export(self, queries: List[Dict], max_workers: int = 5):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for query in queries:
futures.append(executor.submit(self._export_query, query))
for future in futures:
try:
future.result()
except Exception as e:
logger.error(f"Thread execution error: {str(e)}")
def main():
parser = argparse.ArgumentParser(description="Snowflake Data Exporter")
parser.add_argument("--config", required=True, help="Path to JSON config file")
parser.add_argument("--output-dir", default="./output", help="Output directory")
parser.add_argument("--labels", nargs="+", help="Filter queries by labels")
args = parser.parse_args()
# 加载配置文件
with open(args.config) as f:
config_data = json.load(f)
# 过滤查询配置
queries = config_data.get("queries", [])
if args.labels:
queries = [q for q in queries if q.get("label") in args.labels]
# 执行导出
exporter = SnowflakeExporter(config_data, args.output_dir)
exporter.execute_export(queries)
if __name__ == "__main__":
main()
程序说明
- 配置文件结构:
{
"queries": [
{
"label": "user_data",
"sql": "SELECT * FROM users WHERE created_at >= %(start_date)s",
"params": {
"start_date": "2023-01-01"
}
}
]
}
- 主要特性:
- 多线程处理:使用ThreadPoolExecutor实现并发导出
- 分批处理:每次获取10,000条记录处理大数据
- 自动重写文件:始终使用’w’模式打开文件
- 详细日志:记录到文件和控制台,包含JSON格式的运行状态
- 错误处理:捕获所有异常并记录详细信息
- 参数化查询:防止SQL注入攻击
- 环境变量:通过环境变量管理敏感信息
- 运行方式:
export SNOWFLAKE_USER=your_user
export SNOWFLAKE_PASSWORD=your_password
python exporter.py --config queries.json --output-dir ./data --labels user_data sales_data
- 日志示例:
{
"timestamp": "2023-10-10T15:30:45.123456",
"label": "user_data",
"status": "SUCCESS",
"file_path": "./output/user_data.csv",
"rows_exported": 150000,
"duration_seconds": 12.34,
"error_message": null
}
优化点说明
- 内存管理:
- 使用服务器端游标分批获取数据(fetchmany)
- 流式写入CSV文件,避免内存中保存完整结果集
- 并发控制:
- 通过ThreadPoolExecutor管理线程池
- 默认5个worker(可根据硬件调整)
- 可观测性:
- 结构化日志记录
- 精确的性能指标(持续时间、处理行数)
- 错误堆栈信息记录
- 安全措施:
- 参数化查询防止SQL注入
- 敏感信息通过环境变量传递
- 自动清理资源(with语句保证连接关闭)
可根据实际需求调整以下参数:
- fetchmany的批量大小(当前10,000)
- 线程池大小(默认5)
- 日志格式和详细程度
- 文件编码方式(当前utf-8)