当前位置: 首页 > article >正文

Python带多组标签的Snowflake SQL查询批量数据导出程序

设计一个基于多个带标签Snowflake SQL语句作为json配置文件的Python代码程序,实现根据不同的输入参数自动批量地将Snowflake数据库的数据导出为CSV文件到本地目录上,标签加扩展名.csv为导出数据文件名,文件已经存在则覆盖原始文件。使用Python Snowflake Connector,需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。

import argparse
import csv
import json
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import snowflake.connector
from typing import Dict, List, Optional

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("data_export.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class SnowflakeExporter:
    def __init__(self, config: Dict, output_dir: str = "./output"):
        self.snowflake_config = {
            "user": os.getenv("SNOWFLAKE_USER"),
            "password": os.getenv("SNOWFLAKE_PASSWORD"),
            "account": os.getenv("SNOWFLAKE_ACCOUNT"),
            "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
            "database": os.getenv("SNOWFLAKE_DATABASE"),
            "schema": os.getenv("SNOWFLAKE_SCHEMA")
        }
        self.output_dir = output_dir
        os.makedirs(self.output_dir, exist_ok=True)
        
    def _get_file_path(self, label: str) -> str:
        return os.path.join(self.output_dir, f"{label}.csv")

    def _export_query(self, query_config: Dict):
        label = query_config["label"]
        sql = query_config["sql"]
        params = query_config.get("params", {})
        file_path = self._get_file_path(label)
        
        start_time = datetime.now()
        total_rows = 0
        status = "SUCCESS"
        error_msg = None
        
        try:
            conn = snowflake.connector.connect(**self.snowflake_config)
            cursor = conn.cursor()
            
            logger.info(f"Executing query for [{label}]")
            cursor.execute(sql, params)
            
            # 获取列信息
            columns = [col[0] for col in cursor.description]
            
            # 初始化文件写入
            with open(file_path, "w", newline="", encoding="utf-8") as f:
                writer = csv.writer(f)
                writer.writerow(columns)
                
                # 分批处理数据
                while True:
                    rows = cursor.fetchmany(10000)
                    if not rows:
                        break
                    writer.writerows(rows)
                    total_rows += len(rows)
                    
        except Exception as e:
            status = "FAILED"
            error_msg = str(e)
            logger.error(f"Error exporting {label}: {error_msg}", exc_info=True)
            
        finally:
            cursor.close()
            conn.close()
            duration = (datetime.now() - start_time).total_seconds()
            
            # 记录日志
            log_entry = {
                "timestamp": datetime.now().isoformat(),
                "label": label,
                "status": status,
                "file_path": file_path,
                "rows_exported": total_rows,
                "duration_seconds": round(duration, 2),
                "error_message": error_msg
            }
            logger.info(json.dumps(log_entry, indent=2))

    def execute_export(self, queries: List[Dict], max_workers: int = 5):
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = []
            for query in queries:
                futures.append(executor.submit(self._export_query, query))
                
            for future in futures:
                try:
                    future.result()
                except Exception as e:
                    logger.error(f"Thread execution error: {str(e)}")

def main():
    parser = argparse.ArgumentParser(description="Snowflake Data Exporter")
    parser.add_argument("--config", required=True, help="Path to JSON config file")
    parser.add_argument("--output-dir", default="./output", help="Output directory")
    parser.add_argument("--labels", nargs="+", help="Filter queries by labels")
    args = parser.parse_args()

    # 加载配置文件
    with open(args.config) as f:
        config_data = json.load(f)
    
    # 过滤查询配置
    queries = config_data.get("queries", [])
    if args.labels:
        queries = [q for q in queries if q.get("label") in args.labels]
    
    # 执行导出
    exporter = SnowflakeExporter(config_data, args.output_dir)
    exporter.execute_export(queries)

if __name__ == "__main__":
    main()

程序说明

  1. 配置文件结构
{
  "queries": [
    {
      "label": "user_data",
      "sql": "SELECT * FROM users WHERE created_at >= %(start_date)s",
      "params": {
        "start_date": "2023-01-01"
      }
    }
  ]
}
  1. 主要特性
  • 多线程处理:使用ThreadPoolExecutor实现并发导出
  • 分批处理:每次获取10,000条记录处理大数据
  • 自动重写文件:始终使用’w’模式打开文件
  • 详细日志:记录到文件和控制台,包含JSON格式的运行状态
  • 错误处理:捕获所有异常并记录详细信息
  • 参数化查询:防止SQL注入攻击
  • 环境变量:通过环境变量管理敏感信息
  1. 运行方式
export SNOWFLAKE_USER=your_user
export SNOWFLAKE_PASSWORD=your_password
python exporter.py --config queries.json --output-dir ./data --labels user_data sales_data
  1. 日志示例
{
  "timestamp": "2023-10-10T15:30:45.123456",
  "label": "user_data",
  "status": "SUCCESS",
  "file_path": "./output/user_data.csv",
  "rows_exported": 150000,
  "duration_seconds": 12.34,
  "error_message": null
}

优化点说明

  1. 内存管理
  • 使用服务器端游标分批获取数据(fetchmany)
  • 流式写入CSV文件,避免内存中保存完整结果集
  1. 并发控制
  • 通过ThreadPoolExecutor管理线程池
  • 默认5个worker(可根据硬件调整)
  1. 可观测性
  • 结构化日志记录
  • 精确的性能指标(持续时间、处理行数)
  • 错误堆栈信息记录
  1. 安全措施
  • 参数化查询防止SQL注入
  • 敏感信息通过环境变量传递
  • 自动清理资源(with语句保证连接关闭)

可根据实际需求调整以下参数:

  • fetchmany的批量大小(当前10,000)
  • 线程池大小(默认5)
  • 日志格式和详细程度
  • 文件编码方式(当前utf-8)

http://www.kler.cn/a/584370.html

相关文章:

  • (十一) 人工智能 - Python 教程 - Python元组
  • springboot_data介绍
  • JavaScript语言的区块链隐私
  • centos7使用gpu加速的MinerU
  • Java 和 Kotlin 实现 23 种设计模式:从理论到实践
  • 【Python办公】Excel通用匹配工具(双表互匹)
  • PHP火山引擎API签名方法
  • 【Python办公自动化】—Excel中相同编号自动添加-1-2-3...
  • Word中把参考文献引用改为上标
  • 描述K8S创建pod的全过程
  • Linux 系统负载过高的排查思路
  • Secure and Privacy-Preserving Decentralized Federated Learning同态加密联邦学习文献阅读
  • 大数据与区块链——天作之合的技术搭档
  • SiC/GaN器件测试新选择:MHO5000如何破解高频开关噪声难题?
  • android 调用wps打开文档并感知保存事件
  • 吊舱视频参数设置
  • Pygame实现记忆拼图游戏1
  • git 浅克隆及后续分支快速切换
  • (三)穷人技术部署方案:基于K80显卡的DeepSeek-Ollama部署
  • 上海利氪科技-再次续订MappingSpace