当前位置: 首页 > article >正文

学习大数据DAY57 新的接口配置

作业

完成 API 接口和文件的接入, 并部署到生产调度平台, 每个任务最后至少
要有两条 不报错 的日志, 报错就驳回作业
作业不需要复制日志
API = Appliation Program Interface 应用程序接口 => JSON 的地址
客户需求: 把
https://zhiyun.pub:9099/site/c-class?page=1 所有数据定 时同步到 Hive 数仓
分析 分页数据
https://zhiyun.pub:9099/site/c-class?page=1
https://zhiyun.pub:9099/site/c-class?page=2
...
https://zhiyun.pub:9099/site/c-class?page=20
技术: Python + requests 请求库
需要的依赖包
pip install requests hdfs
c_org_busi.py:
#!/bin/python3
import requests
from hdfs import *
import os
# *客户需求: 把 https://zhiyun.pub:9099/site/c-class?page=1 所有
数据定时同步到 Hive 数仓
lines = []
page = 1
pages = 1
def get_data(page=1):
global pages
print(f"正在抽取第{page}页的数据")
url = f"https://zhiyun.pub:9099/site/c-class?page={page}"
r = requests.get(url)
data = r.json()
if data["status"] == 1:
page_data = data["data"]
# 更新页数
pages = data["pages"]
# print(f"pages: {pages}")
# print(page_data)
for item in page_data:
# {'id': '1', 'levels': '1', 'classcode': '01',
'classname': '中西成药', 'saletax': '0.00', 'createtime':
'1900-01-20 11:16:47', 'createuser': '1002', 'notes': 'null',
'stamp': '562664386'}
# 字典 => Hive 数据格式# A B C D ...
values = item.values()
# print(values)
# dict_values(['1', '1', '01', '中西成药', '0.00',
'1900-01-20 11:16:47', '1002', 'null', '562664386'])
# 把所有元素转换成字符串
str_list = []
for value in values:
str_list.append(f"{value}")
# print(str_list)
# ['1', '1', '01', '中西成药', '0.00', '1900-01-20
11:16:47', '1002', 'null', '562664386']
# 字符串的 join 方法, 把列表的所有元素拼接起来
line = "\t".join(str_list)
# print(line)
# 1
1
01
中西成
药
0.00
1900-01-20
11:16:47
1002
null
562664386
lines.append(line)
# print(lines)
def do_get_data():
global page, pages
while page <= pages:
# print(f"pages: {pages}")
get_data(page)
# print(f"pages: {pages}")
page = page + 1
# 写入到数据文件
with
open("/zhiyun/shihaihong/data/c_class.data","w",encoding="utf-
8") as f:
content = "\n".join(lines)
f.write(content)
print("文件写入成功")
# 上传到 HDFSdef upload_data_hdfs():
# 创建 HDFS 目录
client = Client("http://192.168.200.100:9870")
client.makedirs("/zhiyun/shihaihong/ods/c_class")
# 上传
# 注意再次上传会报已存在错误
client.upload("/zhiyun/shihaihong/ods/c_class",
"/zhiyun/shihaihong/data/c_class.data");
print("上传成功")
# Hive 建表
def craete_hive_table():
os.system('''
hive -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists ods_shihaihong.c_class(
id int,
levels int,
classcode string,
classname string,
saletax decimal(10,2),
createtime timestamp,
createuser string,
notes string,
stamp int
) row format delimited fields terminated by "\t"
lines terminated by "\n"
stored as textfile
location "/zhiyun/shihaihong/ods/c_class";
'
''')
# 验证数据
def check_data():
os.system('''
hive -e '
select count(1) from ods_shihaihong.c_class;
select * from ods_shihaihong.c_class limit 5;
'
''')# 爬取数据
do_get_data()
upload_data_hdfs()
craete_hive_table()
check_data()
运行后验证:
部署到调度平台:
执行前,要现在 hdfs 建立文件:
hadoop fs -mkdir -p /zhiyun/lijinquan/ods/c_class
把之前上传的数据清空,然后用任务调度再执行一遍。
执行一次,查看执行日志:
文件接入:
需求: 定时下载
https://zhiyun.pub:9099/设备清单.xlsx
, 然后上传到
HDFS 建表,更新
c_tools.py:
#!/bin/python3
import requests
from hdfs import *
import os
import pandas
# *客户需求: 定时下载 https://zhiyun.pub:9099/设备清单.xlsx , 然后
上传到 HDFS 建表,更新
def get_data():
print(f"正在抽取数据")
url = f"https://zhiyun.pub:9099/设备清单.xlsx"
r = requests.get(url)
if r.status_code == 200:
with open("/zhiyun/shihaihong/data/download_设备清
单.xlsx","wb") as f:
f.write(r.content)
df=pandas.read_excel("/zhiyun/shihaihong/data/downl
oad_设备清单.xlsx")df.to_csv("/zhiyun/shihaihong/data/download_设备清
单.csv",index=False,header=False)
print("文件下载成功")
f.close()
else:
print("文件下载失败")
# 上传到 HDFS
def upload_data_hdfs():
# 创建 HDFS 目录
client = Client("http://192.168.200.100:9870")
client.makedirs("/zhiyun/shihaihong/ods/c_tools")
# 上传
# 注意再次上传会报已存在错误
client.upload("/zhiyun/shihaihong/ods/c_tools",
"/zhiyun/shihaihong/data/download_设备清单.csv");
print("上传成功")
# Hive 建表
def create_hive_table():
os.system('''
hive -e '
create database if not exists ods_shihaihong location
"/zhiyun/shihaihong/ods";
create external table if not exists ods_shihaihong.c_tools(
id int,
hospital string,
tool_name string,
manufacturer string,
produce_date string,
administrative_officer string,
picture string
) row format delimited fields terminated by ","
lines terminated by "\n"
stored as textfile
location "/zhiyun/shihaihong/ods/c_tools";
'
''')
# 验证数据
# def check_data():#
os.system('''
#
hive -e '
#
select count(1) from ods_shihaihong.c_class;
#
select * from ods_shihaihong.c_class limit 5;
# '
#
''')
# 爬取数据
get_data()
upload_data_hdfs()
create_hive_table()
# check_data()
运行结束后验证:
检查 HDFS 是否有上传文件:
检查 Hive 数据库中是否有此库:
任务定时调度:
登录老师的任务调度中心:
编辑 GLUE IDE
跟前面差不多,就是生产调度的 HDFS 路径需要注意修改
client = Client("http://cdh02:9870")
执行日志:
把上一个脚本的代码也写入生产调度中心:

http://www.kler.cn/news/306226.html

相关文章:

  • AI学习指南深度学习篇-RMSprop的数学原理
  • Python 课程11-Web 开发
  • Android 10.0 mtk平板camera2横屏预览旋转90度横屏保存圆形预览缩略图旋转90度功能实现
  • 蓝桥杯3. 压缩字符串
  • 掌握远程管理的艺术:揭秘Python的pywinrm库
  • 【OJ刷题】双指针问题3
  • 通义灵码在Visual Studio上
  • spring-TransactionTemplate 编程式事务
  • C#笔记10 Thread类怎么终止(Abort)和阻止(Join)线程
  • SQLite的入门级项目学习记录(四)
  • [项目][WebServer][Task]详细讲解
  • python绘制3d建筑
  • flask-sqlalchemy的模型类两个表,既有一对一又有一对多的情况时,解决方法
  • SAP HCM HR_ABS_ATT_TIMES_AT_ENTRY 跨夜班不生效问题
  • 【MyBatis精讲】从入门到精通的详细指南:简化Java持久层操作的艺术
  • 开源 AI 智能名片小程序:开启内容营销新境界
  • Harmony Next 文件命令操作(发送、读取、媒体文件查询)
  • 【最佳实践】配置类封装-Async异步注解以及自定义线程池
  • 对操作系统(OS)管理和进程的理解
  • 28 线性表 · 栈
  • golang的GC(三色标记法+混合写屏障)学习笔记
  • 第一篇---滑动窗口最大值、前 K 个高频元素
  • 初识爬虫2
  • Linux删除SSH生成的密钥对
  • 探索Python的Excel世界:openpyxl的魔法之旅
  • 【homebrew安装】踩坑爬坑教程
  • 路由策略原理与配置
  • C#笔记11 获取线程及其信息,什么是优先级、单元状态、线程状态、执行状态、线程名称以及其他属性?
  • 一文速通calcite结合flink理解SQL从文本变成执行计划详细过程
  • Kubernetes Pod镜像的3种状态