一键生成数据库对应的所有DataX的json文件
Datax是一个非常优秀的数据导入导出工具,想必小伙伴们都使用过,但是今天老板说:小张,你把mysql中的所有表都导入到hive的ods层,这该怎么办?一张表对应一个json文件,这不得写一个月?我们可以通过python编写一个脚本来实现,说干就干。
一、知识必备:
json.dump() 是 Python 中用于将一个 Python 对象序列化为 JSON 格式并将其写入到文件中的函数。它来自于 json 模块,这个模块是标准库的一部分,专门用来处理 JSON 数据。
以下是一个基本的使用示例:
import json
# 一个 Python 字典对象
data = {
"name": "John Doe",
"age": 30,
"city": "New York"
}
# 打开一个文件以写入模式
with open('output.json', 'w') as f:
# 使用 json.dump() 将数据写入文件
json.dump(data, f)
在这个例子中,字典 data 被转换为 JSON 并写入到名为 output.json 的文件中。这将生成一个包含如下内容的文件:
{
"name": "John Doe",
"age": 30,
"city": "New York"
}
二、编写python代码
第一步:能够通过python读取mysql的数据库的表
import pymysql
def getDBData(dbName,tableName):
db_connection = pymysql.connect(
host="bigdata01",
port=3306,
user="root",
password="123456",
database='information_schema'
)
cursor = db_connection.cursor()
sql = f"select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = '{dbName}' and table_name = '{tableName}' order by ordinal_position"
cursor.execute(sql)
result = cursor.fetchall()
cursor.close()
db_connection.close()
return result
if __name__ == '__main__':
result = getDBData("spark_project","oms_order")
print(result)
第二步:使用json.dump将文件保存起来
第三步:拼接json文件
以下是编写好的python脚本:
import pymysql
def getDBData(dbName,tableName):
db_connection = pymysql.connect(
host="bigdata01",
port=3306,
user="root",
password="123456",
database='information_schema'
)
cursor = db_connection.cursor()
sql = f"select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = '{dbName}' and table_name = '{tableName}' order by ordinal_position"
cursor.execute(sql)
result = cursor.fetchall()
cursor.close()
db_connection.close()
return result
def getFiledName(dbName,tableName):
listTuple = getDBData(dbName,tableName)
a = list(map(lambda x:x[0],listTuple))
return ",".join(a)
def getFiledsNameAndType(dbName,tableName):
k1 = getDBData(dbName, tableName)
# 作用是将mysql中的数据类型,映射为hive表中的数据类型
mappings = {
'bigint': 'bigint',
'varchar': 'string',
'int': 'int',
'datetime': 'string',
'text': 'string',
'decimal':'string'
}
# {
# "name": "status",
# "type": "int"
# }
a = list(map(lambda x: {"name": x[0],"type": mappings[x[1]]}, k1))
return a
import json
import sys
if __name__ == '__main__':
# 获取外部参数值
if len(sys.argv) == 3:
dbName = sys.argv[1]
tableName = sys.argv[2]
else:
print("未传入外部参数或参数不够")
sys.exit(-1)
result = getDBData(dbName,tableName)
print(result)
# data = {"name":"张三","age":30}
data = {
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": [
"select "+ getFiledName(dbName,tableName) + " from "+tableName
],
"jdbcUrl": [
"jdbc:mysql://bigdata01:3306/"+dbName
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://bigdata01:9820",
"fileType": "text",
"path": f"/user/hive/warehouse/ods.db/ods_{tableName}"+"/dt=${dtime}",
"fileName": tableName,
"writeMode": "append",
"column":
getFiledsNameAndType(dbName,tableName)
,
"fieldDelimiter": "\t",
"partition": "dt"
}
}
}
]
}
}
with open(f"./jsonfile/{tableName}.json","w",encoding="utf-8") as f:
json.dump(data, f)
三、运行上述python脚本
python AutoCreateJson.py spark_project cms_subject
python AutoCreateJson.py spark_project cms_subject_category
四、脚本升级
根据以上写法,我还要执行93次脚本,很累,耽误打游戏,于是编写了一个一件生成所有表的json的脚本:
#!/bin/bash
# 运行MySQL命令并将结果存储到文件
mysql -uroot -p123456 -h bigdata01 -e "use spark_project; show tables;" > /tables.txt
# 使用awk命令提取数据库名称并存储到集合中
database_names=($(awk '{print $1}' /tables.txt))
# 打印集合中的数据库名称
echo "${database_names[@]}"
for tableName in ${database_names[*]};
do
python AutoCreateJson.py spark_project $tableName
done
第一步:在/home下创建文件夹 jsonfile
第二步:下载pymysql
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pymysql
第三步:将py脚本上传至/home下
第四步:运行脚本
sh create_all.sh