当前位置: 首页 > article >正文

Dolphinscheduler配置dataX离线采集任务写入hive实践(二)

这里写目录标题

  • 一、 写入hive 配置
    • 1.1 权限报错信息 :
    • 1.2 hive 中文件格式
    • 1.3 注意区别以下建表语句
      • A、构建ORC 格式分区表
      • B. 构建默认文件格式分区表
      • C.构建非分区表
  • 二、dataX 配置hive 分区表导入 配置
      • 2.1 检查hive 表分区是否存在

在这里插入图片描述

一、 写入hive 配置

dataX 写入hive 不支持直接配置select 语句,必须配置一个json 任务,推荐使用hdfswriter

 "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://hadoop.fancv.com:9000",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/fancv_center_devdb.db/test_p_text/ct=2020",
                        "fileName": "test_p_file",
                        "column": [
                            {
                                "name": "id",
                                "type": "INT"
                            },
                            {
                                "name": "name",
                                "type": "STRING"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\u0001",
                        "compress":"GZIP"
                    }
                }

hdfswriter 时直接写文件,所以回遇到权限问题,上一篇文章中 直接粗暴解决 chmod
但系统和内hive表一般时动态增加的,总不能一直靠着命令行解决问题吧,后边研究了Hadoop的权限校验。

发现可以在系统中配置环境变量指定用户,这样就可以在dolphins中通过dataX 直接向hive 中写入数据了

1.1 权限报错信息 :

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=default, access=WRITE, inode="/user/hive/warehouse/sz_center_devdb.db/test_p":anonymous:supergroup:drwxr-xr-x
                at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:496)
                at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:336)
                at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:241)
                at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1909)
                at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1893)
                at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1852)
                at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:323)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2635)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2577)
                at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:807)
                at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:494)
                at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
                at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:532)
                at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
                at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1020)
                at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:948)
                at java.security.AccessController.doPrivileged(Native Method)
                at javax.security.auth.Subject.doAs(Subject.java:422)
                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
                at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2952)

                at org.apache.hadoop.ipc.Client.call(Client.java:1476)
                at org.apache.hadoop.ipc.Client.call(Client.java:1407)
                at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
                at com.sun.proxy.$Proxy10.create(Unknown Source)
                at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:296)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
                at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
                at com.sun.proxy.$Proxy11.create(Unknown Source)
                at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1623)
                ... 14 more

                at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:40)
                at com.alibaba.datax.plugin.writer.hdfswriter.HdfsHelper.textFileStartWrite(HdfsHelper.java:317)
                at com.alibaba.datax.plugin.writer.hdfswriter.HdfsWriter$Task.startWrite(HdfsWriter.java:360)
                at com.alibaba.datax.core.taskgroup.runner.WriterRunner.run(WriterRunner.java:56)
                at java.lang.Thread.run(Thread.java:750)
        Caused by: org.apache.hadoop.security.AccessControlException: Permission denied: user=default, access=WRITE, inode="/user/hive/warehouse/sz_center_devdb.db/test_p":anonymous:supergroup:drwxr-xr-x
                at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:496)
                at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:336)
                at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:241)
                at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1909)
                at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1893)
                at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1852)
                at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.resolvePathForStartFile(FSDirWriteFileOp.java:323)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2635)
                at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2577)
                at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:807)
                at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:494)
                at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
                at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:532)
                at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
                at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1020)
                at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:948)
                at java.security.AccessController.doPrivileged(Native Method)
                at javax.security.auth.Subject.doAs(Subject.java:422)
                at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
                at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2952)

这里可以通过设置环境变量的方式解决

详情参考: fancv.com
参考资料:https://www.cnblogs.com/chhyan-dream/p/12929362.html

1.2 hive 中文件格式

hive 文件保存有格式区别,如果在dataX 任务里,没有细心核对hive 文件格式,虽然表面上dataX 任务执行成功,但是hive的文件却被破坏了,导致不可用。

报错信息如下:

org.jkiss.dbeaver.model.exec.DBCException: SQL 错误: java.io.IOException: org.apache.orc.FileFormatException: Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
	at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCResultSetImpl.nextRow(JDBCResultSetImpl.java:183)
	at org.jkiss.dbeaver.model.impl.jdbc.struct.JDBCTable.readData(JDBCTable.java:195)
	at org.jkiss.dbeaver.ui.controls.resultset.ResultSetJobDataRead.lambda$0(ResultSetJobDataRead.java:123)
	at org.jkiss.dbeaver.model.exec.DBExecUtils.tryExecuteRecover(DBExecUtils.java:173)
	at org.jkiss.dbeaver.ui.controls.resultset.ResultSetJobDataRead.run(ResultSetJobDataRead.java:121)
	at org.jkiss.dbeaver.ui.controls.resultset.ResultSetViewer$ResultSetDataPumpJob.run(ResultSetViewer.java:5062)
	at org.jkiss.dbeaver.model.runtime.AbstractJob.run(AbstractJob.java:105)
	at org.eclipse.core.internal.jobs.Worker.run(Worker.java:63)
Caused by: org.apache.hive.service.cli.HiveSQLException: java.io.IOException: org.apache.orc.FileFormatException: Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
	at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:300)
	at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:286)
	at org.apache.hive.jdbc.HiveQueryResultSet.next(HiveQueryResultSet.java:374)
	at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCResultSetImpl.next(JDBCResultSetImpl.java:272)
	at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCResultSetImpl.nextRow(JDBCResultSetImpl.java:180)
	... 7 more
Caused by: org.apache.hive.service.cli.HiveSQLException: java.io.IOException: org.apache.orc.FileFormatException: Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
	at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:465)
	at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:309)
	at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:905)
	at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
	at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
	at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1845)
	at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
	at com.sun.proxy.$Proxy37.fetchResults(Unknown Source)
	at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:561)
	at org.apache.hive.service.cli.thrift.ThriftCLIService.FetchResults(ThriftCLIService.java:786)
	at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1837)
	at org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults.getResult(TCLIService.java:1822)
	at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
	at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: org.apache.orc.FileFormatException: Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
	at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:602)
	at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:509)
	at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:146)
	at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2691)
	at org.apache.hadoop.hive.ql.reexec.ReExecDriver.getResults(ReExecDriver.java:229)
	at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:460)
	... 24 more
Caused by: java.lang.RuntimeException: org.apache.orc.FileFormatException:Malformed ORC file hdfs://shangzhi165:9000/user/hive/warehouse/sz_center_devdb.db/test_p/ct=2020/test_p_file__4f4b257d_b0ad_446c_a3ea_e5ea2e81580c.gz. Invalid postscript length 0
	at org.apache.orc.impl.ReaderImpl.ensureOrcFooter(ReaderImpl.java:260)
	at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:570)
	at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
	at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:61)
	at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:96)
	at org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getRecordReader(OrcInputFormat.java:1971)
	at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:776)
	at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:344)
	at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:540)
	... 29 more

以上就是把ORC 的文件当错text 来处理,导致异常。

我们要在配置datax 任务的时候,一定要仔细核对。

1.3 注意区别以下建表语句

A、构建ORC 格式分区表

CREATE TABLE IF NOT EXISTS `test_p` (
  `id` int COMMENT 'date in file', 
  `name` string COMMENT 'appname' )
COMMENT 'cleared log of origin log'
PARTITIONED BY (
  `ct`  string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS ORC
TBLPROPERTIES ('creator'='c-chenjc', 'crate_time'='2018-06-07')
;

B. 构建默认文件格式分区表

CREATE TABLE IF NOT EXISTS `test_p_text` (
  `id` int COMMENT 'date in file', 
  `name` string COMMENT 'appname' )
COMMENT 'cleared log of origin log'
PARTITIONED BY (
  `ct`  string
)

C.构建非分区表

CREATE TABLE IF NOT EXISTS `my_test_p` (
  `id` int COMMENT 'date in file', 
  `name` string COMMENT 'appname' ,
  `ct`  string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS ORC
TBLPROPERTIES ('creator'='c-chenjc', 'crate_time'='2018-06-07')
;

二、dataX 配置hive 分区表导入 配置

dataX 导入hive 分区表必须指定分区,否则的话,任务执行成功,没有返回异常,但是呢,数据没写入。

示例:

{
        "job": {
                "setting": {
                        "speed": {
                                "channel": 1
                        }
                },
                "content": [{
                        "reader": {
                                "name": "mysqlreader",
                                "parameter": {
                                        "column": ["id","name"],
                                        "password": "xxxxx$xx",
                                        "username": "xxxx",
                                        "where": "",
                                        "connection": [{
                                                "jdbcUrl": ["jdbc:mysql://x x xx:3306/web_magic"],
                                                "table": ["mysql_test_p"]
                                        }]
                                }
                        },
                        "writer": {
                                "name": "hdfswriter",
                                "parameter": {
                                        "column": [{
                                                        "name": "id",
                                                        "type": "int"
                                                },
                                                {
                                                        "name": "name",
                                                        "type": "string"
                                                }

                                        ],
                                        "compress": "",
                                        "defaultFS": "hdfs://xxxxx:xxx",
                                        "fieldDelimiter": ",",
                                        "fileName": "test_p",
                                        "fileType": "text",
                                        "path": "/user/hive/warehouse/testjar.db/test_p/ct=shanghai",
                                        "writeMode": "append"
                                }
                        }
                }]

        }
}



注意:你在导入的时候这个分区还必须存在,否则报异常。

2.1 检查hive 表分区是否存在

show partitions test_p;

http://www.kler.cn/a/390685.html

相关文章:

  • react 中 FC 模块作用
  • javascript 函数【知识点整理】
  • 免费HTML模板和CSS样式网站汇总
  • Rust 建造者模式
  • 解决C盘空间不足的三种方案
  • JavaScript数组去重的实用方法汇总
  • 前端请求后端php接口跨域 cors问题
  • Android Settings 单元测试 | 如何运行单元测试?
  • sanitize-html 防止 XSS(跨站脚本攻击)
  • linux 用C语言编写自己的myshell
  • libgdiplus在MacOS M1上问题:Unable to load shared library ‘libgdiplus‘
  • unity3d————协程练习题
  • 简记Vue3(五)—— Pinia
  • C++ 错题本--duplicate symbol问题
  • linux文本管理!!!
  • 实在智能受邀出席柳州市智能终端及机器人产业发展合作大会
  • 【车道线检测】一、传统车道线检测:基于霍夫变换的车道线检测史诗级详细教程
  • AI引领PPT创作:迈向“免费”时代的新篇章?
  • Git超详细教程
  • 二、前端学习:个人主页网站搭建
  • vue实现拖拽,可实现排序拖拽和自由拖拽(vuedraggable)
  • lnmp:自己的“百度网盘”
  • SpringBoot自动装配原理解析
  • Android 生成并加载PDF文件
  • 豆包大模型团队开源RLHF框架,破解强化学习训练部署难题
  • AI猫娘第二弹 | 基于Text和Chat模型实现文本生成