常用命令之LinuxOracleHivePython
1. 用户改密
passwd app_adm
chage -l app_adm
passwd -x 90 app_adm -> 执行操作后,app_adm用户的密码时间改为90天有效期
--查看该euser用户过期信息使用chage命令
--chage的参数包括
---m 密码可更改的最小天数。为零时代表任何时候都可以更改密码。
---M 密码保持有效的最大天数。
---W 用户密码到期前,提前收到警告信息的天数。
---E 帐号到期的日期。过了这天,此帐号将不可用。
---d 上一次更改的日期
---i 停滞时期。如果一个密码已过期这些天,那么此帐号将不可用。
---l 例出当前的设置。由非特权用户来确定他们的密码或帐号何时过期
2. 开机自启
#方式一
将开机自启脚本加到/etc/rc.d/rc.loacl
#方式二
systemctl enable nfs-server
systemctl enable rpcbind
#查看是否开机自启
systemctl list-unit-files|grep enabled | grep nfs-server
systemctl list-unit-files|grep enabled | grep rpcbind
3.查找&推送命令
find /home -size +1M --查找 /home 目录下大于 1MB 的文件
find . -name "*.c" --将当前目录及其子目录下所有文件后缀为 .c 的文件列出来
grep -irnl "dim_poia_capital_zone" /home/app_adm/etl/Hqlexec/sql --查询Linux文件中包含字符的文件名称
ssh -l app_adm 10.169.1.178 --ssh指定用户访问
scp -P 4588 hdfs@stmg102:/usr/local/sin.sh /home/administrator --以及scp集群间拷贝文件
hdfs dfs -chmod -R 777 /user/hive/warehouse/
hdfs dfs -chown -R hive:hive /user/hive/warehouse/
hdfs dfs -put -f /usr/local/sin.sh /user/hive/warehouse/sin.sh --如果文件已存在覆盖
hdfs dfs -get /user/hive/warehouse/sin.sh /usr/local/ --如果文件已存在覆盖
4.windows 获取目录下的文件名
# windows中将目录下面的文件名称写到list.txt文件中
dir *.* /B >list.txt
5. 建立软连接
--建议软连接
ln -s /usr/local/python3/bin/python3 /usr/bin/python
--删除软连接,源文件还在
rm mylink
--给当前目录下的所有文件赋权
chmod 766 ./*
--注意:路径采用Linux的写法(/),防止符号转义 */
6.Linux查看wiondows共享盘的用户名和密码
--查看wiondows共享盘的用户名和密码
sudo cat /etc/samba/smb.conf | grep "共享盘名称"
--配置开机自动挂载的可以查看开机自动挂载
cat /etc/fstab
7. kettle 解决数据库中文乱码
右键编辑spoon.bat 添加 "-Dfile.encoding=UTF-8"
保存重启
8.查看内核版本
--查看内核版本
cat /proc/version
--查看是x86
uname -m
9.Git
步骤 1:创建 Git 存储库
步骤 2:复制存储库并添加文件
步骤 3:从存储库中提取更改
步骤 4:使用源代码树分支合并更新
10. kettle 文件通配符
.*CRM.*.xml|EMP_.*.xml
11. linux中的环境变量
/etc/profile :用来设置系统的环境变量,对系统内所有用户生效,source /etc/profile
~/.bashrc:针对某一个特定的用户,设置环境变量只对该用户自己生效
--追加生效
source /etc/profile >> ~/.bashrc
--查看追加
cat /etc/profile >> ~/.bashrc
12. rpm命令
--这个命令可以使rpm 安装时不检查依赖关系
rpm -i software-2.3.4.rpm --nodeps
-- 这个命令可以强制安装rpm包
rpm -ivh software-2.3.4.rpm --force
13. finereport激活码
fd336a2a-4a7e3c38f-6433-d59127605172
14.shell分割字符串并赋值给变量
id=`echo $var | awk -F "=" '{print $2}'`
15. kettle变量
--job中设置
--1.job中设置变量 ->2.下一个转换中[获取变量]组件(客户修改名称以及去空格)->3.转换中直接使用${变量名称} (范围设置在当前job中有效即可,否则变量有重复则会错误)
--转换中设置
--1.转换中设置变量不能在当前转换中使用(可以读取表中的数据->进行设置变量)->2.在下一个转换中使用[获取变量]组件->3.来${变量名称}进行使用 (范围设置在当前job中有效即可,否则变量有重复则会错误)
16. 更新kettle数据库脚本
create table repkettle.r_database_20240424bakup as select * from repkettle.r_database;
update repkettle.r_database t
set t.host_name = '10.169.1.228', t.database_name = 'etltdb'
where t.host_name = '10.169.1.42';
17.flink资源使用
1.任务具体的实现和数据规模
其中数据规模:读取文件的大小 处理数据的行数
2.并行度
窗口大小:窗口越大需要的CPU和内存就越多
并行度因子 :不同的并行度因子对应着不同的并行度
3.JVM堆大小 JVM堆越大,可以同时运行的线程数就越多。 因此,JVM大小可以调整并发度
4.架构
ngnix->flume->kafka->flink->存储(采集-计算-指标计算)
--其中要保证数据丢失问题、重复读取(偏移量)
18.shell脚本调用OracleSQL异常捕获
#add 20221216 捕获异常修复,由于执行OracleSQL进入新的进程,使用判断返回值是否为空的方式捕获异常
cmd=`echo "$cmd"| awk '{printf "%s\n", $0}'`
echo "cmd:"$cmd
if [ ! ${cmd} ]; then
echo "EXECUTION SUCCEEDED"
echo $cmd
exit 0
else
echo "EXECUTION FAILED"
echo $cmd
exit -1
fi
19.jdbc双节点配置
jdbc:oracle:thin:@(DESCRIPTION =(ADDRESS_LIST =(ADDRESS = (PROTOCOL = TCP)(HOST = 10.163.128.79)(PORT = 1521))(ADDRESS = (PROTOCOL = TCP)(HOST = 10.163.128.80)(PORT = 1521))(LOAD_BALANCE = yes))(CONNECT_DATA =(SERVER = DEDICATED)(SERVICE_NAME = etldb)))
20. 创建用户及赋权
--创建用户
create user test123 identified by test123;
--赋权
grant connect,resource to test123;
21.表空间使用情况查询语句
--3. 表空间使用情况
select a.tablespace_name,total,free,total-free used from
( select tablespace_name,sum(bytes)/1024/1024 total from dba_data_files
group by tablespace_name) a,
( select tablespace_name,sum(bytes)/1024/1024 free from dba_free_space
group by tablespace_name) b
where a.tablespace_name=b.tablespace_name;
---1、表占用空间
select segment_name, sum(bytes)/1024/1024 Mbytese from user_segments where segment_type='TABLE' group by segment_name;
--增加表空间
ALTER TABLESPACE dc_dw ADD DATAFILE
'D:\oracle\product\10.2.0\oradata\irdcdev\DC_DW1.DBF' SIZE 500M
AUTOEXTEND ON NEXT 8G MAXSIZE unlimited;
22. 修改MYSQL密码
mysql
use mysql
update user set password=password("Ab123456") where user="root" and host='10.169.1.%';
flush privileges;
select host,user,password from user;
23. 加载csv文件到hive表中
--建表
USE database;
CREATE TABLE table(
ssid int comment 'ssid',
orderid int comment 'orderid'
)
COMMENT 'table comment'
row format delimited fields terminated by ','
STORED AS textfile
location '/data_coffe/ods/dim_channel';
download[hdfs:///***/hotel.csv-20190904-172542.csv hotel.csv] --加载路径
LOAD DATA local INPATH 'hotel.csv' into table database.table;--将数据load到表中
24.MYSQL创建只读用户
--1.创建用户
CREATE USER 'readonly'@'%' IDENTIFIED BY '123456';
CREATE USER 'readonly'@'10.76.2.25' IDENTIFIED BY '123456';
CREATE USER 'readonly'@'10.76.2.26' IDENTIFIED BY '123456';
--2.给用户赋予只读权限
GRANT SELECT ON hive.* TO 'readonly'@'%';
GRANT SELECT ON hive.* TO 'readonly'@'10.76.2.25';
GRANT SELECT ON hive.* TO 'readonly'@'10.76.2.26';
--3.刷新权限
FLUSH PRIVILEGES;
创建只读用户完成;账号:readonly 密码:123456 操作的数据库表:hive.* 指定登录的IP:’%’(所有)
GRANT [权限]
ON [库.表]
TO [用户名]@[IP]
IDENTIFIED BY [密码]
25. HIVE元数据乱码操作
--修改字段注释字符集
--①修改表字段注解和表注解
alter table COLUMNS_V2 modify column COMMENT varchar(256) character set utf8
alter table TABLE_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8
--②修改分区字段注解:
alter table PARTITION_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8 ;
alter table PARTITION_KEYS modify column PKEY_COMMENT varchar(4000) character set utf8;
--③修改索引注解:
alter table INDEX_PARAMS modify column PARAM_VALUE varchar(4000) character set utf8;
SELECT s.NAME,t.tbl_name,UPPER(g.column_name) AS column_name ,g.COMMENT,g.integer_idx, g.type_name
FROM TBLS t, SDS k, COLUMNS_V2 g,DBS s
WHERE t.sd_id = k.sd_id
AND k.cd_id = g.cd_id
and t.DB_ID = s.DB_ID
and g.COMMENT like '?%'
ORDER BY s.NAME,t.tbl_name,g.integer_idx;
SELECT distinct s.NAME,t.tbl_name
FROM TBLS t, SDS k, COLUMNS_V2 g,DBS s
WHERE t.sd_id = k.sd_id
AND k.cd_id = g.cd_id
and t.DB_ID = s.DB_ID
and g.COMMENT like '?%'
ORDER BY s.NAME,t.tbl_name,g.integer_idx;
1.修改表注释
alter table 表名 set tblproperties('comment' = '注释');
2.修改字段注释
alter table 表名 change 旧字段名 新字段名 字段类型 comment '注释';
3.修改分区字段注释,需要在元数据库中执行
update PARTITION_KEYS ps
join TBLS ts on ps.tbl_id = ts.tbl_id
join DBS ds on ts.db_id = ds.db_id
set ps.pkey_comment = '注释'
where lower(ds.name) = '库名'
and lower(ts.tbl_name) = '表名'
and lower(ps.pkey_name) = '分区字段名';
STORED AS PARQUET
TBLPROPERTIES ( 'parquet.compression'='snappy');
STORED AS TEXTFILE;
SELECT s.NAME,t.tbl_name,UPPER(g.column_name) AS column_name ,g.COMMENT,g.integer_idx, g.type_name
FROM TBLS t, SDS k, COLUMNS_V2 g,DBS s
WHERE t.sd_id = k.sd_id
AND k.cd_id = g.cd_id
and t.DB_ID = s.DB_ID
and g.COMMENT like '?%'
and s.NAME='aml'
ORDER BY s.NAME,t.tbl_name,g.integer_idx;
SELECT s.TBL_NAME,t.PARAM_VALUE from TABLE_PARAMS t ,TBLS s
where t.TBL_ID = s.TBL_ID
and t.PARAM_KEY='comment'
order by s.TBL_NAME
;
update TABLE_PARAMS ps join TBLS ts on ps.tbl_id = ts.tbl_id set ps.PARAM_VALUE = '注释' where ps.PARAM_KEY='comment' and ts.TBL_NAME =
SELECT d.NAME,s.TBL_NAME,t.PARAM_VALUE from TABLE_PARAMS t ,TBLS s,DBS d
where t.TBL_ID = s.TBL_ID
and d.DB_ID = s.DB_ID
and t.PARAM_KEY='comment' and PARAM_VALUE like '%?%'
order by s.TBL_NAME
26. excel_to_csv
import pandas as pd
import argparse
parser = argparse.ArgumentParser(description='manual to this script')
parser.add_argument("--input_path", type=str, required=True)
parser.add_argument("--output_path", type=str, required=True)
args = parser.parse_args()
print(args.input_path)
print(args.output_path)
# 格式转换
def excel_to_txt(input_path, output_path):
df = pd.read_excel(input_path, header=None)
print('开始写入txt文件')
with open(output_path,'w') as f:
f.write(df.to_csv(header=False, sep=',', index=False))
if __name__ == '__main__':
print(args.input_path)
print(args.output_path)
input_path = args.input_path
output_path = args.output_path
excel_to_txt(input_path, output_path)
27.文件编码转换,将文件编码转换为UTF-8
# -*- coding: utf-8 -*-
"""
@Author : @luyz
@File : FileToCharacter.py
@Project: ETLScript
@Time : 2023-02-23 10:41:18
@Desc : The file is ...
@Version: v1.0
"""
import sys
import os
import gc
import shutil
import traceback
import time
import chardet
import shutil
def change_coding(src_file,dst_file):
"""
文件编码转换,将文件编码转换为UTF-8
:param file:
:return:
"""
coding='GB18030'
with open(src_file, 'r', encoding=coding) as fr, open(dst_file, 'w', encoding='utf-8') as fw:
content=fr.read()
content=str(content.encode('utf-8'),encoding='utf-8')
#print(content,file=fw)
fw.write(content)
#拷贝文件
def copy_file(s_path,t_path):
source_path = os.path.abspath(s_path)
target_path = os.path.abspath(t_path)
if not os.path.exists(target_path):
os.makedirs(target_path)
if os.path.exists(source_path):
# root 所指的是当前正在遍历的这个文件夹的本身的地址
# dirs 是一个 list,内容是该文件夹中所有的目录的名字(不包括子目录)
# files 同样是 list, 内容是该文件夹中所有的文件(不包括子目录)
for root, dirs, files in os.walk(source_path):
for file in files:
src_file = os.path.join(root, file)
shutil.copy(src_file, target_path)
#print(src_file)
# 重新创建临时目录
def mkdir(tmp_path):
for root, dirs, files in os.walk(tmp_path, topdown=False):
for name in files:
os.remove(os.path.join(root, name))
for name in dirs:
os.rmdir(os.path.join(root, name))
if not os.path.exists(tmp_path):
os.makedirs(tmp_path)
#遍历文件统一将文件从gbk转utf-8
def iterate_file_to_utf8(target_path,tmp_path):
for root,dirs,files in os.walk(target_path,topdown=False):
for file in files:
#获取文件所属目录
#print("root: "+root)
#获取文件路径
src_file = os.path.join(root,file)
fileprefix = os.path.splitext(src_file)[-1]
#print("fileprefix :"+fileprefix)
if fileprefix in (".txt") and fileprefix.strip():
print("[INFO]: file: " + file)
dst_file = os.path.join(tmp_path,file)
#如果文件本身就是utf-8,直接复制
encoding = get_encoding(src_file)
print('[INFO]: ' + str(src_file) + " encoding is: " + str(encoding))
if str(encoding) in ("utf-8"):
print("[INFO]: start copy file.")
shutil.copyfile(src_file, dst_file)
else :
print("[INFO]: file is not utf-8")
change_coding(src_file,dst_file)
def get_encoding(file):
with open(file,'rb') as f:
return chardet.detect(f.read())['encoding']
#获取目录个数
def get_file_wc(path):
count = 0
files = os.listdir(path)
for file in files:
fileprefix = os.path.splitext(file)[-1]
#print("fileprefix :"+fileprefix)
if fileprefix in (".txt") and fileprefix.strip():
#print("fileprefix :"+fileprefix)
count = count + 1
#count = len(files)
print('[INFO]: '+ path + " count :" + str(count))
return count
def get_utf8file_wc(path):
count = 0
files = os.listdir(path)
for file in files:
fileprefix = os.path.splitext(file)[-1]
path_full = os.path.join(path,file)
encoding = get_encoding(path_full)
print('[INFO]: ' + path_full + " encoding :"+str(encoding))
if fileprefix in (".txt") and fileprefix.strip() and str(encoding) in ("utf-8"):
#print("fileprefix :"+fileprefix)
count = count + 1
#count = len(files)
print('[INFO]: ' + path + " count :" + str(count))
return count
if __name__ == "__main__":
# 验证入参个数
if (len( sys.argv ) != 4):
print("[ERROR]: arg error! please input:<src_file><dst_file><file_date> ")
sys.exit(1)
source_path = sys.argv[1]+'/'+sys.argv[3]
target_path = sys.argv[2]+'/'+sys.argv[3]
print("[INFO]: 0.Start printing parameters")
print("[INFO]: source_path: " + sys.argv[1])
print("[INFO]: target_path: " + sys.argv[2])
print("[INFO]: file_date: " + sys.argv[3])
try:
if not os.path.exists(source_path):
print('[INFO]: '+source_path + "dir is not exists")
sys.exit(1)
#1.拷贝文件
print("[INFO]: 1.Start copying files")
copy_file(source_path,target_path)
print("[INFO]: 将文件 {source_path} 拷贝到 {target_path}".format(source_path = source_path,target_path = target_path))
#2.重新创建临时目录
tmp_path = target_path + '/tmp'
print("[INFO]: 2.Create a temp dir: "+tmp_path)
mkdir(tmp_path)
#3.遍历文件统一将文件从gbk转utf8
print("[INFO]: 3.Begins character set conversion")
iterate_file_to_utf8(target_path,tmp_path)
print("[INFO]: file conversion success!!!")
#4.稽核转码之后的文件个数
print("[INFO]: 4.Start auditing the number of files after transcoding")
src_wc = get_file_wc(source_path)
dst_wc = get_file_wc(tmp_path)
result = abs(src_wc - dst_wc)
if abs(result) > 0: # Decimal
print("[INFO]: This audit file count is not passed!!!")
sys.exit(1)
print("[INFO]: The audit was successful!!!")
sys.exit(0)
except Exception:
exe = traceback.format_exc()
print("[ERROR]: file conversion fail...")
print("[ERROR]: source filePath : {source_path}".format(source_path=source_path))
print(exe)
sys.exit(1)
28.Linux定时任务
1.使用yum命令安装Crontab:
yum install vixie-cron
yum install crontabs
注:vixie-cron软件包是cron的主程序;
crontabs软件包是用来安装、卸装、或列举用来驱动 cron 守护进程的表格的程序。
cron是linux的内置服务,但它不自动起来,可以用以下的方法启动、关闭这个服务:
/sbin/service crond start #启动服务
/sbin/service crond stop #关闭服务
/sbin/service crond restart #重启服务
/sbin/service crond reload #重新载入配置
复制
2.查看Crontab状态:
service crond status
ntsysv #查看crontab服务是否已设置为开机启动
chkconfig –level 35 crond on #加入开机自动启动
3.添加定时任务:
crontab -e #编辑cron任务模式
i #默认文字编辑器为vim,按i字母键即可添加cron任务
30 3 * * * /usr/local/etc/rc.d/lighttpd restart #将命令代码放入,此命令意义为每天的03:30 重启apache
ESC #按ESC键退出编辑模式
:wq #键入:wq保存
service crond restart #重启crontab服务
复制
4.查看任务列表:
crontab -l
复制
5.Crontab相关命令:
(1)语 法:
crontab [-u <用户名称>][配置文件] 或 crontab { -l | -r | -e }
-u #<用户名称> 是指设定指定<用户名称>的定时任务,这个前提是你必须要有其权限(比如说是 root)才能够指定他人的时程表。如果不使用 -u user 的话,就是表示设定自己的定时任务。
-l #列出该用户的定时任务设置。
-r #删除该用户的定时任务设置。
-e #编辑该用户的定时任务设置。
(2)命令时间格式 :
* * * * * command
分 时 日 月 周 命令
第1列表示分钟1~59 每分钟用*或者 */1表示
第2列表示小时1~23(0表示0点)
第3列表示日期1~31
第4列表示月份1~12
第5列标识号星期0~6(0表示星期天)
第6列要运行的命令
(3)一些Crontab定时任务例子:
30 21 * * * /usr/local/etc/rc.d/lighttpd restart #每晚的21:30 重启apache
45 4 1,10,22 * * /usr/local/etc/rc.d/lighttpd restart #每月1、10、22日的4 : 45重启apache
10 1 * * 6,0 /usr/local/etc/rc.d/lighttpd restart #每周六、周日的1 : 10重启apache
0,30 18-23 * * * /usr/local/etc/rc.d/lighttpd restart #每天18 : 00至23 : 00之间每隔30分钟重启apache
0 23 * * 6 /usr/local/etc/rc.d/lighttpd restart #每星期六的11 : 00 pm重启apache
* 23-7/1 * * * /usr/local/etc/rc.d/lighttpd restart #晚上11点到早上7点之间,每隔一小时重启apache
* */1 * * * /usr/local/etc/rc.d/lighttpd restart #每一小时重启apache
0 11 4 * mon-wed /usr/local/etc/rc.d/lighttpd restart #每月的4号与每周一到周三的11点重启apache
0 4 1 jan * /usr/local/etc/rc.d/lighttpd restart #一月一号的4点重启apache
*/30 * * * * /usr/sbin/ntpdate cn.pool.ntp.org #每半小时同步一下时间
0 */2 * * * /sbin/service httpd restart #每两个小时重启一次apache
50 7 * * * /sbin/service sshd start #每天7:50开启ssh服务
50 22 * * * /sbin/service sshd stop #每天22:50关闭ssh服务
0 0 1,15 * * fsck /home #每月1号和15号检查/home 磁盘
1 * * * * /home/bruce/backup #每小时的第一分执行 /home/bruce/backup这个文件
00 03 * * 1-5 find /home "*.xxx" -mtime +4 -exec rm {} \; #每周一至周五3点钟,在目录/home中,查找文件名为*.xxx的文件,并删除4天前的文件。
30 6 */10 * * ls #每月的1、11、21、31日是的6:30执行一次ls命令
yum install vixie-cron
yum install crontabs
service crond status
crontab -e #编辑cron任务模式
i #默认文字编辑器为vim,按i字母键即可添加cron任务
30 15 * * 3,5 chmod -R 775 /data/BI/tomcat/webapps/webroot/WEB-INF/reportlets
#将命令代码放入,此命令意义为每周3,周5的15:30 执行chmod
ESC #按ESC键退出编辑模式
:wq #键入:wq保存
service crond restart #重启crontab服务
查看任务列表:crontab -l
定时清理日志
Crontab日志路径:
ll /var/log/cron*
29. windows下,使用putty软件以ssh的方式连接linux,中文显示乱码
问题说明: windows下,使用putty软件以ssh的方式连接linux,中文显示乱码
解决方案:
第一步:查看linux 机器的系统语言字符集。
操作命令:
locale -a
命令执行结果表面,linux机器是utf-8字符集
第二步:打开putty,进行如下操作。
windows–>Translation–>Remote character set,选择与linux机器系统语言一致的字符集。
下图以utf-8为例进行演示:
设置完成后,重新使用ssh连接linux机器,即可。
30.从mysql中提取hive建表语句 Python版本
#!/usr/bin/env python
#-*- coding:utf8 -*-
# 从mysql中提取hive建表语句
import os,sys
import fileinput
import datetime
import pymysql
#reload(sys)
#sys.setdefaultencoding("utf8")
def hive_create_table():
conn = pymysql.Connect(host="10.169.1.102",user='scm',passwd='Ab123456!!',database='hive',charset='utf8')
mycursor = conn.cursor()
# 获取DB_ID
select_DB_ID = "select DB_ID from DBS;"
mycursor.execute(select_DB_ID)
result_DB_ID = mycursor.fetchall()
fo = open("create_tab.sql", "w")
for dir_DB_ID in result_DB_ID :
# 获取数据库名
DB_ID = str(dir_DB_ID)[1:].split(',')[0]
print(DB_ID)
select_DB_NAME = "select NAME from DBS where DB_ID="+DB_ID+";"
print(select_DB_NAME)
mycursor.execute(select_DB_NAME)
result_DB_NAME = mycursor.fetchone()
fo.write("\n--===========数据库:"+str(result_DB_NAME).split('\'')[1]+"===========\n")
DBname=str(result_DB_NAME).split('\'')[1]
print('数据库名字:' + DBname)
print(result_DB_NAME)
# 获取表名
select_table_name_sql = "select TBL_NAME from TBLS where DB_ID="+DB_ID+";"
mycursor.execute(select_table_name_sql)
result_table_names = mycursor.fetchall()
for table_name_ in result_table_names :
#如果存在删除表
fo.write("\nDROP TABLE IF EXISTS "+DBname +'.`'+str(table_name_).split('\'')[1]+"`;")
fo.write("\n")
for table_name in result_table_names :
#建表语句
fo.write("\nCREATE TABLE IF NOT EXISTS "+DBname +'.`'+str(table_name).split('\'')[1]+"`(\n")
# 根据表名获取SD_ID
select_table_SD_ID = "select SD_ID from TBLS where tbl_name='"+str(table_name).split('\'')[1]+"' and DB_ID="+DB_ID+";"
print(select_table_SD_ID)
mycursor.execute(select_table_SD_ID)
result_SD_ID = mycursor.fetchone()
print(result_SD_ID )
# 根据SD_ID获取CD_ID
SD_ID=str(result_SD_ID)[1:].split(',')[0]
select_table_CD_ID = "select CD_ID from SDS where SD_ID="+str(result_SD_ID)[1:].split(',')[0]+";"
print(select_table_CD_ID)
mycursor.execute(select_table_CD_ID)
result_CD_ID = mycursor.fetchone()
print(result_CD_ID)
# 根据CD_ID获取表的列
CD_ID=str(result_CD_ID)[1:].split(',')[0]
select_table_COLUMN_NAME = "select COLUMN_NAME,TYPE_NAME,COMMENT from COLUMNS_V2 where CD_ID="+str(result_CD_ID)[1:].split(',')[0]+" order by INTEGER_IDX;"
print(select_table_COLUMN_NAME)
mycursor.execute(select_table_COLUMN_NAME)
result_COLUMN_NAME = mycursor.fetchall()
print(result_COLUMN_NAME)
index=0
for col,col_type,col_name in result_COLUMN_NAME:
print(col)
print(col_type)
print(col_name)
print(len(result_COLUMN_NAME) )
# 写入表的列和列的类型到文件
if col_name is None:
fo.write(" `"+str(col)+"` "+str(col_type))
else:
fo.write(" `"+str(col)+"` "+str(col_type) + " COMMENT '" + str(col_name) + "'")
if index < len(result_COLUMN_NAME)-1:
index = index + 1
fo.write(",\n")
elif index == len(result_COLUMN_NAME)-1:
fo.write("\n)")
# 根据表名获取TBL_ID
select_table_SD_ID = "select TBL_ID from TBLS where tbl_name='"+str(table_name).split('\'')[1]+"' and DB_ID="+DB_ID+";"
print(select_table_SD_ID)
mycursor.execute(select_table_SD_ID)
result_TBL_ID = mycursor.fetchone()
print(result_TBL_ID)
# 根据TBL_ID获取分区信息
select_table_PKEY_NAME_TYPE = "select PKEY_NAME,PKEY_TYPE,PKEY_COMMENT from PARTITION_KEYS where TBL_ID="+str(result_TBL_ID)[1:].split(',')[0]+" order by INTEGER_IDX;"
print(select_table_PKEY_NAME_TYPE)
mycursor.execute(select_table_PKEY_NAME_TYPE)
result_PKEY_NAME_TYPE = mycursor.fetchall()
print(result_PKEY_NAME_TYPE)
if len(result_PKEY_NAME_TYPE) > 0:
fo.write("\nPARTITIONED BY (\n")
else :
fo.write("\n")
i=0
for pkey_name,pkey_type,PKEY_COMMENT in result_PKEY_NAME_TYPE:
if str(PKEY_COMMENT) is None:
fo.write(" `"+str(pkey_name)+"` "+str(pkey_type))
else:
fo.write(" `"+str(pkey_name)+"` "+str(pkey_type) + " COMMENT '" + str(PKEY_COMMENT) + "'\n")
if i < len(result_PKEY_NAME_TYPE)- 1:
i = i + 1
fo.write(",")
elif i == len(result_PKEY_NAME_TYPE) - 1:
fo.write(")\n")
# 根据表TBL_ID 获得中文名称
select_PARAM_VALUE01 = "select PARAM_VALUE from TABLE_PARAMS WHERE TBL_ID=( select TBL_ID from TBLS where tbl_name='"+str(table_name).split('\'')[1]+"' and DB_ID="+DB_ID+") and PARAM_KEY='comment';"
print(select_PARAM_VALUE01)
mycursor.execute(select_PARAM_VALUE01)
result_PARAM_VALUE01 = mycursor.fetchone()
print(result_PARAM_VALUE01)
if result_PARAM_VALUE01 is None:
print('未设置表名')
elif not result_PARAM_VALUE01[0]:
print('表名为空')
else:
fo.write("COMMENT '" + str(result_PARAM_VALUE01[0]) +"' \n" )
# 根据SD_ID和CD_ID获取SERDE_ID
select_SERDE_ID = "select SERDE_ID from SDS where SD_ID="+SD_ID+" and CD_ID="+CD_ID+";"
print(select_SERDE_ID)
mycursor.execute(select_SERDE_ID)
result_SERDE_ID = mycursor.fetchone()
print(result_SERDE_ID)
# 根据SERDE_ID获取PARAM_VALUE(列分隔符)
select_PARAM_VALUE = "select PARAM_VALUE from SERDE_PARAMS where SERDE_ID="+str(result_SERDE_ID)[1:].split(",")[0]+" and PARAM_KEY='field.delim';"
print(select_PARAM_VALUE)
mycursor.execute(select_PARAM_VALUE)
result_PARAM_VALUE = mycursor.fetchone()
print(result_PARAM_VALUE)
if result_PARAM_VALUE is not None:
fo.write("ROW FORMAT DELIMITED\n")
fo.write("FIELDS TERMINATED BY '"+str(result_PARAM_VALUE).split('\'')[1]+"'\n")
# 根据SERDE_ID获取PARAM_VALUE(行分隔符)
select_PARAM_HNAG = "select PARAM_VALUE from SERDE_PARAMS where SERDE_ID="+str(result_SERDE_ID)[1:].split(",")[0]+" and PARAM_KEY='line.delim';"
print(select_PARAM_HNAG)
mycursor.execute(select_PARAM_HNAG)
RESULT_PARAM_HNAG = mycursor.fetchone()
print(RESULT_PARAM_HNAG)
if RESULT_PARAM_HNAG is not None:
fo.write("LINES TERMINATED BY '"+str(RESULT_PARAM_HNAG).split('\'')[1]+"'\n")
# 根据SD_ID和CD_ID获取输入输出格式
select_table_STORE_FORMAT = "select INPUT_FORMAT from SDS where SD_ID="+SD_ID+" and CD_ID="+CD_ID+";"
print(select_table_STORE_FORMAT)
mycursor.execute(select_table_STORE_FORMAT)
result_table_STORE_FORMAT= mycursor.fetchall()
print(result_table_STORE_FORMAT)
for store_format in result_table_STORE_FORMAT:
if "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat" in str(store_format):
fo.write("STORED AS ORC;\n")
elif "org.apache.hadoop.mapred.TextInputFormat" in str(store_format):
fo.write("STORED AS TEXTFILE\n")
#fo.write("location "+"'/tmp/hive/"+DBname.lower()+"/"+str(table_name).split('\'')[1].lower()+"';\n")
elif "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat" in str(store_format):
fo.write("STORED AS PARQUET\n")
fo.write("TBLPROPERTIES('parquet.compression'='snappy');\n")
elif "org.apache.kudu.mapreduce.KuduTableInputFormat" in str(store_format):
fo.write("STORED AS KuduTable;\n")
else :
fo.write("STORED AS null;\n")
fo.close()
hive_create_table()
31. html2txt
import requests
url = "https://sc.sfc.hk/TuniS/www.sfc.hk/TC/Regulatory-functions/Corporates/Takeovers-and-mergers/offer-periods"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9",
# "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
}
def request_get(url):
"""
数据请求
:param url: 请求的url
:return: 返回html的文本
"""
try:
response = requests.get(url, headers=headers)
with open(r"C:\luyz\temp\html_con.txt", "w") as f:
f.write(response.text)
return response.text
except Exception as e:
print("[ERROR]:" + str(e))
exit(-1)
if __name__ == '__main__':
try:
response_text = request_get(url)
except Exception as e:
print("[ERROR]:" + str(e))
exit(-1)
32.xml2excel
import requests
from lxml import etree
import pandas as pd
import os
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
url = "https://www.XXX.com/images/fund/fund_2697.xml"
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9",
# "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0",
}
def save_data(data, columns, excel_path, sheet_name):
df = pd.DataFrame(data, columns=columns)
if not os.path.exists(excel_path):
df.to_excel(excel_path, sheet_name=sheet_name, index=False)
else:
with pd.ExcelWriter(excel_path, engine='openpyxl', mode='a') as writer:
df.to_excel(writer, sheet_name=sheet_name, index=False)
def xml2list(response_text):
# 创建ElementTree对象并解析XML文档
root = etree.fromstring(response_text.encode('utf-8'))
excel_columns = ['Fund_code',
'shortname',
'Fund_name',
'Fund_investtype',
'Fund_desc',
'Fund_qcode',
'Fund_currency',
'Fund_trusteefee',
'Fund_type',
'Fund_manager',
'Fund_objective',
'Fund_trustee',
'Fund_risklevel',
'Fund_benmark',
'Fund_riskreturn',
'Fund_eatablishment',
'Fund_endtime',
'Fund_riskmanagement',
'Fund_issuedate',
'Fund_customertyp',
'Fund_administration',
'Fund_subscription',
'Fund_management',
'Fund_IP',
'Fund_sp',
'Fund_hotfund',
'Fund_scope'
]
excel_data = []
# 使用XPath定位元素并打印内容
funds = root.xpath('//Fund')
for fund in funds:
excel_row_data = []
for column_index in range(len(excel_columns)):
if column_index != len(excel_columns) - 1:
excel_row_data.extend(fund.xpath('@' + str(excel_columns[column_index])))
else:
excel_row_data.extend(fund.xpath(str(excel_columns[column_index]) + '/text()'))
"""excel_row_data.extend(fund.xpath('@Fund_code'))
excel_row_data.extend(fund.xpath('Fund_scope/text()'))"""
excel_data.append(excel_row_data)
return excel_columns, excel_data
if __name__ == '__main__':
try:
excel_path = "C:/luyz/temp/20231207/xml.xlsx"
sheet_name = 'result_data'
response_text = requests.get(url, headers=headers, timeout=(21, 300)).content.decode("utf8")
excel_columns, excel_data = xml2list(response_text)
print("=================excel_columns=======================")
print(excel_columns)
print("=================excel_data==========================")
for x in excel_data:
print(x)
print("=====================================================")
# 文件存在,则删除
if os.path.exists(excel_path):
os.remove(excel_path)
# 保存文件
save_data(excel_data, excel_columns, excel_path, sheet_name)
print("save_data is end.")
except Exception as e:
print("[ERROR]:" + str(e))
exit(-1)
33.
配置表结构
-- Create table
create table REPKETTLE.CTL_B2B_CONFIG
(
filename VARCHAR2(100) not null,
src_db_name VARCHAR2(100) not null,
src_table VARCHAR2(50) not null,
src_query VARCHAR2(4000) not null,
trg_db_name VARCHAR2(200) not null,
trg_table VARCHAR2(200) not null,
trg_writemode VARCHAR2(2) not null,
truncate_sql VARCHAR2(4000) not null,
load_type VARCHAR2(2) not null,
trg_schema VARCHAR2(50)
);
-- Add comments to the columns
comment on column REPKETTLE.CTL_B2B_CONFIG.filename
is '下载表的系统名+表名(该字段是下载表的主键)';
comment on column REPKETTLE.CTL_B2B_CONFIG.src_db_name
is '源数据库名称,需使用ctl_db_info定义的db_name';
comment on column REPKETTLE.CTL_B2B_CONFIG.src_table
is '源表名';
comment on column REPKETTLE.CTL_B2B_CONFIG.src_query
is '导出语句';
comment on column REPKETTLE.CTL_B2B_CONFIG.trg_db_name
is '目标数据库名称,需使用ctl_db_info定义的db_name';
comment on column REPKETTLE.CTL_B2B_CONFIG.trg_table
is '目标表名';
comment on column REPKETTLE.CTL_B2B_CONFIG.trg_writemode
is 'I:为增量模式,F:为全量模式(标识作用)';
comment on column REPKETTLE.CTL_B2B_CONFIG.truncate_sql
is '删数语句';
comment on column REPKETTLE.CTL_B2B_CONFIG.load_type
is '加载类型;1-O2M;2-O2O;3-M2O';
comment on column REPKETTLE.CTL_B2B_CONFIG.trg_schema
is '目标表模式';
-- Create table
create table REPKETTLE.CTL_DB_INFO
(
db_name VARCHAR2(50),
db_type VARCHAR2(1) default '0',
db_user VARCHAR2(100),
db_passwd VARCHAR2(100),
db_host VARCHAR2(100),
db_port INTEGER,
db_instant VARCHAR2(100),
base_passad VARCHAR2(100),
charset VARCHAR2(50)
);
-- Add comments to the table
comment on table REPKETTLE.CTL_DB_INFO
is '数据库配置';
-- Add comments to the columns
comment on column REPKETTLE.CTL_DB_INFO.db_name
is '数据库名';
comment on column REPKETTLE.CTL_DB_INFO.db_type
is '数据库类型-0:oracle,1:mysql,默认为0';
comment on column REPKETTLE.CTL_DB_INFO.db_user
is '登陆用户';
comment on column REPKETTLE.CTL_DB_INFO.db_passwd
is '登录密码(base64加密之后)';
comment on column REPKETTLE.CTL_DB_INFO.db_host
is '数据库IP';
comment on column REPKETTLE.CTL_DB_INFO.db_port
is '数据端口号';
comment on column REPKETTLE.CTL_DB_INFO.db_instant
is '实例名';
comment on column REPKETTLE.CTL_DB_INFO.base_passad
is '存放密码-备用';
comment on column REPKETTLE.CTL_DB_INFO.charset
is '数据库字符集编码';
代码
# -*- coding:utf-8 -*-
# 安装依赖
# pip install PyMySQL-0.9.3-py2.py3-none-any.whl
import sys
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
sys.path.append(os.path.abspath(sys.path[0]) + os.path.sep + "../utils")
import logging.config
import cx_Oracle
import pymysql
import datetime
import traceback
import time
import ArgvParser
import GetOracleConfig
import GetConfig
import GetParamsConfig
import LogPathDao
import QueryFromArgs
import QueryFromGlobalVal
#类似nvl()函数
def nvlx(x, y):
if x is None:
return y
else:
return x
def close_conn(conn,cursor):
cursor.close()
conn.close()
#获取数据库连接
def get_con(user, password, db_host, db_port, db_instant, db_charset,db_type,logger):
try:
con=None
if db_type == '0':
dsn = cx_Oracle.makedsn(db_host, db_port, db_instant)
con = cx_Oracle.connect(user, password, dsn)
if db_type == '1':
con = pymysql.Connect(host=db_host,
port=db_port,
user=user,
password=password,
database=db_instant,
charset=db_charset)
logger.info("user: {}, password: {}, db_host: {}, db_port: {}, db_instant: {}, db_charset: {},db_type: {}".format(user, "******", db_host, db_port, db_instant, db_charset,db_type))
return con
except cx_Oracle.DatabaseError:
traceback.print_exc()
os._exit(1)
#读取数据,加载数据以及字段
def read_from_db(conn, sql, logger):
try:
logger.info("execute Read SQL: {}".format(sql))
cursor = conn.cursor()
cursor.execute(sql)
db_columns = []
db_data = []
for column in cursor.description:
db_columns.append(column[0])
#data_from_oracle = cursor.fetchall()
logger.info("Get {} columns".format(db_columns))
for row in cursor:
db_data.append(row)
logger.info("Get total {} rows".format(len(db_data)))
close_conn(conn,cursor)
logger.info("Connect to DB closed.")
return db_columns,db_data
except Exception:
exe = traceback.format_exc()
print (exe)
close_conn(conn,cursor)
sys.exit(1)
#获取插入表的sql语句
def get_insert_sql(db_columns, db_data, trg_table, db_type, logger):
if len(db_data) == 0:
return None
#mysql %s,%S
#Oracle :1,:2
tmp_key=''
tmp_value=''
for i in range(1, len(db_columns)+1):
tmp_value+=':'+str(i)+','
tmp_key+="\""+db_columns[i-1]+"\","
value=tmp_value[:-1]
key=tmp_key[:-1]
insert_sql=None
if db_type == '1':
insert_sql = f"INSERT INTO {trg_table} (`{'`,`'.join(db_columns)}`) VALUES ({','.join(['%s']*len(db_columns))})"
if db_type == '0':
insert_sql = f"INSERT INTO {trg_table} ({key}) VALUES ({value})"
logger.info("Insert SQL: {}".format(insert_sql))
return insert_sql
#执行sql写入数据
def write_to_db(conn, db_columns, db_data, insert_sql, truncate_sql, logger):
try:
logger.info("Delete SQL: {}".format(truncate_sql))
cursor = conn.cursor()
cursor.execute(truncate_sql)
logger.info("Truncate table success!")
effectRow = 0
if insert_sql is not None:
#10000条提交一次
for i in range(0, len(db_data), 10000):
start = time.time()
db_data_1w = db_data[i: i + 10000]
logger.info("Write SQL: {}".format(insert_sql))
#logger.info(db_data_1w)
cursor.executemany(insert_sql, db_data_1w)
#cursor.execute(sql)
conn.commit()
effectRow += cursor.rowcount
#time.sleep(300)
end = time.time()
running_time = end-start
logger.info('Write data is : %.5f sec' %running_time)
logger.info("Write data to table success! Effect row: {}".format(effectRow))
else:
logger.info("Total length of data is zero, no insert!")
close_conn(conn,cursor)
logger.info("Connect to DB closed.")
return effectRow
except Exception:
exe = traceback.format_exc()
print (exe)
close_conn(conn,cursor)
sys.exit(1)
def audit(s_retval,t_retval):
s_retval = nvlx(s_retval,0)
t_retval = nvlx(t_retval,0)
logger.info("s_retval: {}".format(s_retval))
logger.info("t_retval: {}".format(t_retval))
vn_biasval = abs(s_retval - t_retval)
if abs(vn_biasval) > 0: # Decimal
logger.info("This audit-rule is not passed!!!")
sys.exit(1)
else:
logger.info("This audit-rule is passed")
if __name__ == '__main__':
parentdir = os.path.abspath(sys.path[0] + os.path.sep + "..")
rootpath = os.path.abspath(parentdir + os.path.sep + "..")
confpath = rootpath + "/config/config.ini"
# 获取配置文件配置信息
GetC = GetConfig.GetConfig(confpath)
username, password, url, url2 = GetC.config()
# 返回参数字典
GetParamsC = GetParamsConfig.GetParamsConfig(username, password, url, url2)
argv_type, globaldict = GetParamsC.getconfig()
# 构建传递参数
ArgP = ArgvParser.ArgvParser(1)
argvdict = ArgP.parse(argv_type)
# 将tablename计入参数与类型字典
argv_type["filename"] = "string"
sqlcommand = '''SELECT T.SRC_QUERY,
T.TRUNCATE_SQL,
T.TRG_SCHEMA||'.'|| T.TRG_TABLE AS TRG_TABLE,
SDB.DB_USER AS SDB_USER,
--SDB.DB_PASSWD AS SDB_PASSWD,
UTL_RAW.CAST_TO_VARCHAR2(UTL_ENCODE.BASE64_DECODE(UTL_RAW.CAST_TO_RAW(SDB.DB_PASSWD))) AS SDB_PASSWD,
SDB.DB_HOST AS SDB_HOST,
SDB.DB_PORT AS SDB_PORT,
SDB.DB_INSTANT AS SDB_INSTANT,
SDB.DB_TYPE AS SDB_TYPE,
SDB.CHARSET AS SDB_CHARSET,
TDB.DB_USER AS TDB_USER,
--TDB.DB_PASSWD AS TDB_PASSWD,
UTL_RAW.CAST_TO_VARCHAR2(UTL_ENCODE.BASE64_DECODE(UTL_RAW.CAST_TO_RAW(TDB.DB_PASSWD))) AS TDB_PASSWD,
TDB.DB_HOST AS TDB_HOST,
TDB.DB_PORT AS TDB_PORT,
TDB.DB_INSTANT AS TDB_INSTANT,
TDB.DB_TYPE AS TDB_TYPE,
TDB.CHARSET AS TDB_CHARSET
FROM REPKETTLE.CTL_B2B_CONFIG T
JOIN REPKETTLE.CTL_DB_INFO SDB
ON T.SRC_DB_NAME = SDB.DB_NAME
JOIN REPKETTLE.CTL_DB_INFO TDB
ON T.TRG_DB_NAME = TDB.DB_NAME
WHERE UPPER(FILENAME) = UPPER('{filename}') '''
sql = sqlcommand.format(filename=argvdict["filename"])
GetOracleC = GetOracleConfig.GetOracleConfig(username, password, url, url2, sql)
diction = GetOracleC.getconfig()
# 用全局变量替换掉字段里的$${.*}
QueryFGL = QueryFromGlobalVal.QueryFromGlobalVal(globaldict)
for key, value in list(diction.items()):
if type(value) == str:
diction[key] = QueryFGL.query(value)
# 获取TRG_TABLE
trg_table = diction["TRG_TABLE"]
# 对query字段做处理
QueryFA = QueryFromArgs.QueryFromArgs(argvdict, argv_type)
src_query = QueryFA.query(diction["SRC_QUERY"])
truncate_sql = QueryFA.query(diction["TRUNCATE_SQL"])
# 获取logging模块的logger
LogDao = LogPathDao.LogPathDao(argvdict["filename"])
logconf, logpath = LogDao.getpath()
logging.config.fileConfig(logconf, defaults={'logdir': logpath})
logger = logging.getLogger('hivelog')
try:
#1.获取数据库连接
logger.info("Source Conn")
sconn = get_con(diction["SDB_USER"]
,diction["SDB_PASSWD"]
,diction["SDB_HOST"]
,diction["SDB_PORT"]
,diction["SDB_INSTANT"]
,diction["SDB_CHARSET"]
,diction["SDB_TYPE"]
,logger)
logger.info("Trg Conn")
tconn = get_con(diction["TDB_USER"]
,diction["TDB_PASSWD"]
,diction["TDB_HOST"]
,diction["TDB_PORT"]
,diction["TDB_INSTANT"]
,diction["TDB_CHARSET"]
,diction["TDB_TYPE"]
,logger)
#2.读取数据,加载数据以及字段
db_columns,db_data = read_from_db(sconn, src_query, logger)
#3.获取插入表的sql语句
insert_sql = get_insert_sql(db_columns, db_data, trg_table, diction["TDB_TYPE"], logger)
#4.执行sql写入数据
effectRow = write_to_db(tconn, db_columns, db_data, insert_sql, truncate_sql, logger)
#5.稽核
audit(len(db_data),effectRow)
except Exception:
exe = traceback.print_exc()
logger.error("Error occurred!")
logger.error(str(exe))
os._exit(1)