当前位置: 首页 > article >正文

kettle工具使用从入门到精通(二)-------Java代码案例

Java代码步骤,属于脚本类别转换,是指对上游数据使用Java代码处理之后往下游发送。Java代码步骤,适用于熟悉Java语言的开发人员,用好这个步骤,需要对类、接口、多线程等语言相关知识有所掌握,并且需要对Kettle的基础框架有所理解。Kettle转换的执行,通常有以下三个核心的阶段:

核心代码

1. 初始化

​ Kettle转换在执行前,会有一个各步骤的初始化动作,为步骤执行前的准备工作创造机会。为提高初始化的性能,Kettle为每个步骤启用一个初始化线程,从而并行完成所有步骤的初始化。初始化的主要内容就是调用一次步骤的以下方法:

public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) {
  return parent.initImpl(stepMetaInterface, stepDataInterface);
}

此方法包含两个参数。其中,meta为元数据,data为数据。如果返回true,那么代表初始化成功,否则代表初始化失败。任何一个步骤初始化失败,都会导致整个转换停止执行(在停止前,会调用每一个转换的资源释放方法dispose)。

2. 执行

​ 执行阶段是每一个步骤实现特定操作的时候,操作通常就是指Main函数中的行数据处理,行处理核心代码如下:

/**
从输入行集中判断是否第一行,判断目的是将其标识,第一行需要特定处理,然后将新的行放入输出行集中。
从输入行集中取数据可以调用getRow方法。
  如果getRow方法返回值不为null,则步骤应将该行数据进行处理,并调用putRow方法将处理结果存入输出行集,然后返回true,以继续为下一行输入数据处理提供机会。
  如果getRow方法返回null,代表输入行集已经处理完毕,这时可以调用setOutputDone,标识本步骤执行完毕,并返回false,以结束本工作线程的执行。
*/
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  if (first) {
    first = false;
  }

  Object[] r = getRow();
  if (r == null) {
    setOutputDone();
    return false;
  }

  r = createOutputRow(r, data.outputRowMeta.size());

  putRow(data.outputRowMeta, r);

  return true;
}

为提高效率,Kettle为每一个步骤单独启动一个工作线程来执行任务。线程一直在执行步骤的processRow方法,直到出现以下情况:

  • processRow方法返回false,代表工作已经正常完成
  • isStopped方法返回true,代表步骤被强制停止
  • processRow方法执行过程中出现异常,代表执行过程中出现错误,Kettle将调用stopAll方法,从而导致整个转换的所有工作线程停止执行。

3.资源释放

Kettle为每一个步骤单独启动一个工作线程来执行任务,不管工作线程是正常执行完毕还是异常执行完毕,最终会调用dispose方法来释放资源。方法如下:

public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
  parent.disposeImpl(smi, sdi);
}

通常情况下,我们只需要重写Main中的processRow()方法即可满足需求,如果用到了一些重量级的资源,最好在init方法中初始化,并在dispose方法中释放。

由于Kettle使用Janino框架为自定义Java转换步骤类动态定义了类名,并指定父类为TransformClassBase,所以在编写Java代码时,只需要提供类的内容即可,无需class声明。既然自动建立了父类,那么父类的成员、方法等都可以在代码中重用。

父类常用的成员包括以下三个实例:

  • parent:代表容器对象
  • meta:代表容器元数据对象
  • data:代表容器数据对象

常用的方法包括:

  • getRow:从输入行集中取一行数据
  • putRow:存一行数据到输出行集
  • stopAll:停止所有工作线程
  • setOutputDone:标记本步骤工作完成
  • logBasic:输出基本日志
  • logError:输出错误日志
  • getInputRowMeta:得到输入行的元数据
  • createOutputRow:创建一个输出行数据

其实,常用的方法,基本上都在Java代码步骤属性对话框左侧Code Snippits中,如下图所示:
在这里插入图片描述
一般情况下,可以双击其中的Main节点,从processRow方法的重写开始,需要其他代码时,在左侧找到对应代码块,双击即可加入。

Java代码案例

例如新增一个拼接字段name
完整代码

//处理行数据
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
  //标识是否是第一行数据
  if (first) {
    first = false;
  }
  //等待前一步骤提供一行数据,会阻塞,返回一个对象数组表示输入行
  Object[] r = getRow();
  //输入行为null,说明已经没有输入行了,设置输出完成
  if (r == null) {
    setOutputDone();
    return false;
  }

  //创建输出行,确认object数组是否能够容纳输出行,如果不够,扩容
  r = createOutputRow(r, data.outputRowMeta.size());

  //初始化输出字符串
  String out_value = "";
  //新增一个字段输出
  if(get(Fields.In,"id").getNumber(r)>0){
   out_value = get(Fields.In,"name").getString(r) +"_" + get(Fields.In,"id").getNumber(r);
  }
  //设置输出字段和值
  get(Fields.Out,"name_id").setValue(r,out_value);
  //将输出数据传输到下一个步骤,会阻塞
  putRow(data.outputRowMeta, r);

  //返回行数据处理是否成功
  return true;
}

输出结果
在这里插入图片描述
在这里插入图片描述

注意:
get(Fields.In,“name”):Fields.In代表输入;这个"name"从前一个步骤获取,通常根据上游字段获取即可,当然,也可以在这个步骤下方的参数中设置,设置如:xx,将代码改为get(Fields.In,xx);
get(Fields.Out, “name_salay”) :Fields.Out代表输出;这个"name_salay"就是下方的字段中设置的name_salay。
输入输出字段名都可以在下方的参数设置,用getParameter(“name”)获取到name的值。

案例参考原文链接:https://blog.csdn.net/u010839779/article/details/133089521


http://www.kler.cn/a/578493.html

相关文章:

  • 八字排盘宝 2025.1.8 | 多模式排盘工具,精准解析八字信息,轻量易用
  • 【机器学习chp11】聚类(K均值+高斯混合模型+层次聚类+基于密度的聚类DBSCAN+基于图的聚类+聚类的性能评价指标)
  • Unity之如何实现哔哩哔哩直播弹幕游戏
  • Python脚本,音频格式转换 和 视频格式转换
  • Pytorch 第八回:卷积神经网络——GoogleNet模型
  • SpringBoot 配置视图控制器
  • Android Activity的启动器ActivityStarter入口
  • 使用 Java 在后端 为 PDF 添加水印
  • 跟着 Lua 5.1 官方参考文档学习 Lua (11)
  • AtCoder ABC E - Min of Restricted Sum 题解
  • Etcd的安装与使用
  • Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现与实战指南
  • 通过定制initramfs实现从单系统分区到双系统的无缝升级
  • 手抖护理全攻略:从生活点滴改善症状
  • AI 智能:开拓未知疆域的科技先锋
  • 11-Agent中配置自己的插件
  • 芋道源码 —— Spring Boot 缓存 Cache 入门
  • 搭建农产品管理可视化,助力农业智能化
  • scala函数的至简原则
  • Android Retrofit + RxJava + OkHttp 网络请求高效封装方案