当前位置: 首页 > article >正文

一键生成数据库对应的所有DataX的json文件

       Datax是一个非常优秀的数据导入导出工具,想必小伙伴们都使用过,但是今天老板说:小张,你把mysql中的所有表都导入到hive的ods层,这该怎么办?一张表对应一个json文件,这不得写一个月?我们可以通过python编写一个脚本来实现,说干就干。

一、知识必备:

json.dump() 是 Python 中用于将一个 Python 对象序列化为 JSON 格式并将其写入到文件中的函数。它来自于 json 模块,这个模块是标准库的一部分,专门用来处理 JSON 数据。

以下是一个基本的使用示例:

import json

# 一个 Python 字典对象
data = {
    "name": "John Doe",
    "age": 30,
    "city": "New York"
}

# 打开一个文件以写入模式
with open('output.json', 'w') as f:
    # 使用 json.dump() 将数据写入文件
    json.dump(data, f)

在这个例子中,字典 data 被转换为 JSON 并写入到名为 output.json 的文件中。这将生成一个包含如下内容的文件:

{
    "name": "John Doe",
    "age": 30,
    "city": "New York"
}

二、编写python代码

第一步:能够通过python读取mysql的数据库的表

import pymysql
def getDBData(dbName,tableName):
    db_connection = pymysql.connect(
        host="bigdata01",
        port=3306,
        user="root",
        password="123456",
        database='information_schema'
    )
    cursor = db_connection.cursor()
    sql = f"select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = '{dbName}' and table_name = '{tableName}' order by ordinal_position"
    cursor.execute(sql)
    result = cursor.fetchall()
    cursor.close()
    db_connection.close()
    return result

if __name__ == '__main__':
    result = getDBData("spark_project","oms_order")
    print(result)

第二步:使用json.dump将文件保存起来

第三步:拼接json文件

以下是编写好的python脚本:

import pymysql
def getDBData(dbName,tableName):
	db_connection = pymysql.connect(
		host="bigdata01",
		port=3306,
		user="root",
		password="123456",
		database='information_schema'
	)
	cursor = db_connection.cursor()
	sql = f"select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = '{dbName}' and table_name = '{tableName}' order by ordinal_position"
	cursor.execute(sql)
	result = cursor.fetchall()
	cursor.close()
	db_connection.close()
	return result
def getFiledName(dbName,tableName):
	listTuple = getDBData(dbName,tableName)
	a = list(map(lambda x:x[0],listTuple))

	return ",".join(a)

def getFiledsNameAndType(dbName,tableName):
	k1 = getDBData(dbName, tableName)
	# 作用是将mysql中的数据类型,映射为hive表中的数据类型
	mappings = {
		'bigint': 'bigint',
		'varchar': 'string',
		'int': 'int',
		'datetime': 'string',
		'text': 'string',
		'decimal':'string'
	}
	#                {
	#                 "name": "status",
	#                 "type": "int"
	#               }
	a = list(map(lambda x: {"name": x[0],"type": mappings[x[1]]}, k1))
	return a

import json
import sys
if __name__ == '__main__':

	# 获取外部参数值
	if len(sys.argv) == 3:
		dbName = sys.argv[1]
		tableName = sys.argv[2]
	else:
		print("未传入外部参数或参数不够")
		sys.exit(-1)


	result = getDBData(dbName,tableName)
	print(result)
	# data = {"name":"张三","age":30}

	data = {
		  "job": {
			"setting": {
						"speed": {
							 "channel": 3
						},
						"errorLimit": {
							"record": 0,
							"percentage": 0.02
						}
					},
			"content": [
			  {
				"reader": {
				  "name": "mysqlreader",
				  "parameter": {
					"username": "root",
					"password": "123456",
					"connection": [
					  {
						"querySql": [
						  "select "+ getFiledName(dbName,tableName) + " from "+tableName
						],
						"jdbcUrl": [
						  "jdbc:mysql://bigdata01:3306/"+dbName
						]
					  }
					]
				  }
				},
				"writer": {
				  "name": "hdfswriter",
				  "parameter": {
					"defaultFS": "hdfs://bigdata01:9820",
					"fileType": "text",
					"path": f"/user/hive/warehouse/ods.db/ods_{tableName}"+"/dt=${dtime}",
					"fileName": tableName,
					"writeMode": "append",
					"column":
					  getFiledsNameAndType(dbName,tableName)
					,
					"fieldDelimiter": "\t",
					"partition": "dt"
				  }
				}
			  }
			]
		  }
		}
	with open(f"./jsonfile/{tableName}.json","w",encoding="utf-8") as f:
		json.dump(data, f)

三、运行上述python脚本

python AutoCreateJson.py spark_project cms_subject
python AutoCreateJson.py spark_project cms_subject_category

四、脚本升级

根据以上写法,我还要执行93次脚本,很累,耽误打游戏,于是编写了一个一件生成所有表的json的脚本:

#!/bin/bash

# 运行MySQL命令并将结果存储到文件
mysql -uroot -p123456 -h bigdata01 -e "use spark_project; show tables;" > /tables.txt

# 使用awk命令提取数据库名称并存储到集合中
database_names=($(awk '{print $1}' /tables.txt))

# 打印集合中的数据库名称
echo "${database_names[@]}"
for tableName in ${database_names[*]};
do
  python AutoCreateJson.py spark_project $tableName
done
第一步:在/home下创建文件夹 jsonfile
第二步:下载pymysql
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pymysql
第三步:将py脚本上传至/home下
第四步:运行脚本
sh create_all.sh

http://www.kler.cn/a/420584.html

相关文章:

  • 使用Mybatis-Plus时遇到的报错问题及解决方案
  • 遇到问题:hive中的数据库和sparksql 操作的数据库不是同一个。
  • 计算机网络-网络安全
  • 在Scala中Array不可变的学习
  • 【人工智能】用Python和Pandas进行时间序列数据分析:从处理到预测
  • 详解Rust异步编程
  • mvc基础及搭建一个静态网站
  • Ubantu系统docker运行成功拉取失败【成功解决】
  • GateWay使用手册
  • 清理Linux/CentOS7根目录的思路
  • Vue3 脚手架扩展
  • Proteus8.17下载安装教程
  • MySQL安装部署
  • IP划分(笔记)
  • 对于Oracle来说,土地管理是非核心域吗
  • 【机器学习】机器学习的基本分类-监督学习-逻辑回归-对数似然损失函数(Log-Likelihood Loss Function)
  • Apache-HertzBeat开源实时监控系统存在默认口令漏洞
  • mysql一个事务最少几次IO操作
  • ESP32开发板在micropython里直接用requests向web服务器发送请求:ESP32S3开发板通过fastapi中转成功连接星河大模型
  • 负载均衡指南:Nginx与HAProxy的配置与优化
  • mysql 查询所有的触发器
  • vmware linux centos7 网络配置
  • 大数据-238 离线数仓 - 广告业务 点击次数 ADS层、广告效果分析 ADS 层 需求分析与加载
  • 2024-11-29 学习人工智能的Day33 BP算法和神经网络小实验
  • Python字符串对齐的几种方法、Python填充与对齐、Python中英文对齐
  • 软件测试常问面试问题及项目流程相关概念