kettle工具使用从入门到精通(二)-------Java代码案例
Java代码步骤,属于脚本类别转换,是指对上游数据使用Java代码处理之后往下游发送。Java代码步骤,适用于熟悉Java语言的开发人员,用好这个步骤,需要对类、接口、多线程等语言相关知识有所掌握,并且需要对Kettle的基础框架有所理解。Kettle转换的执行,通常有以下三个核心的阶段:
核心代码
1. 初始化
Kettle转换在执行前,会有一个各步骤的初始化动作,为步骤执行前的准备工作创造机会。为提高初始化的性能,Kettle为每个步骤启用一个初始化线程,从而并行完成所有步骤的初始化。初始化的主要内容就是调用一次步骤的以下方法:
public boolean init(StepMetaInterface stepMetaInterface, StepDataInterface stepDataInterface) {
return parent.initImpl(stepMetaInterface, stepDataInterface);
}
此方法包含两个参数。其中,meta为元数据,data为数据。如果返回true,那么代表初始化成功,否则代表初始化失败。任何一个步骤初始化失败,都会导致整个转换停止执行(在停止前,会调用每一个转换的资源释放方法dispose)。
2. 执行
执行阶段是每一个步骤实现特定操作的时候,操作通常就是指Main函数中的行数据处理,行处理核心代码如下:
/**
从输入行集中判断是否第一行,判断目的是将其标识,第一行需要特定处理,然后将新的行放入输出行集中。
从输入行集中取数据可以调用getRow方法。
如果getRow方法返回值不为null,则步骤应将该行数据进行处理,并调用putRow方法将处理结果存入输出行集,然后返回true,以继续为下一行输入数据处理提供机会。
如果getRow方法返回null,代表输入行集已经处理完毕,这时可以调用setOutputDone,标识本步骤执行完毕,并返回false,以结束本工作线程的执行。
*/
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
if (first) {
first = false;
}
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());
putRow(data.outputRowMeta, r);
return true;
}
为提高效率,Kettle为每一个步骤单独启动一个工作线程来执行任务。线程一直在执行步骤的processRow方法,直到出现以下情况:
- processRow方法返回false,代表工作已经正常完成
- isStopped方法返回true,代表步骤被强制停止
- processRow方法执行过程中出现异常,代表执行过程中出现错误,Kettle将调用stopAll方法,从而导致整个转换的所有工作线程停止执行。
3.资源释放
Kettle为每一个步骤单独启动一个工作线程来执行任务,不管工作线程是正常执行完毕还是异常执行完毕,最终会调用dispose方法来释放资源。方法如下:
public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
parent.disposeImpl(smi, sdi);
}
通常情况下,我们只需要重写Main中的processRow()方法即可满足需求,如果用到了一些重量级的资源,最好在init方法中初始化,并在dispose方法中释放。
由于Kettle使用Janino框架为自定义Java转换步骤类动态定义了类名,并指定父类为TransformClassBase,所以在编写Java代码时,只需要提供类的内容即可,无需class声明。既然自动建立了父类,那么父类的成员、方法等都可以在代码中重用。
父类常用的成员包括以下三个实例:
- parent:代表容器对象
- meta:代表容器元数据对象
- data:代表容器数据对象
常用的方法包括:
- getRow:从输入行集中取一行数据
- putRow:存一行数据到输出行集
- stopAll:停止所有工作线程
- setOutputDone:标记本步骤工作完成
- logBasic:输出基本日志
- logError:输出错误日志
- getInputRowMeta:得到输入行的元数据
- createOutputRow:创建一个输出行数据
其实,常用的方法,基本上都在Java代码步骤属性对话框左侧Code Snippits中,如下图所示:
一般情况下,可以双击其中的Main节点,从processRow方法的重写开始,需要其他代码时,在左侧找到对应代码块,双击即可加入。
Java代码案例
例如新增一个拼接字段name
完整代码
//处理行数据
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
//标识是否是第一行数据
if (first) {
first = false;
}
//等待前一步骤提供一行数据,会阻塞,返回一个对象数组表示输入行
Object[] r = getRow();
//输入行为null,说明已经没有输入行了,设置输出完成
if (r == null) {
setOutputDone();
return false;
}
//创建输出行,确认object数组是否能够容纳输出行,如果不够,扩容
r = createOutputRow(r, data.outputRowMeta.size());
//初始化输出字符串
String out_value = "";
//新增一个字段输出
if(get(Fields.In,"id").getNumber(r)>0){
out_value = get(Fields.In,"name").getString(r) +"_" + get(Fields.In,"id").getNumber(r);
}
//设置输出字段和值
get(Fields.Out,"name_id").setValue(r,out_value);
//将输出数据传输到下一个步骤,会阻塞
putRow(data.outputRowMeta, r);
//返回行数据处理是否成功
return true;
}
输出结果
注意:
get(Fields.In,“name”):Fields.In代表输入;这个"name"从前一个步骤获取,通常根据上游字段获取即可,当然,也可以在这个步骤下方的参数中设置,设置如:xx,将代码改为get(Fields.In,xx);
get(Fields.Out, “name_salay”) :Fields.Out代表输出;这个"name_salay"就是下方的字段中设置的name_salay。
输入输出字段名都可以在下方的参数设置,用getParameter(“name”)获取到name的值。
案例参考原文链接:https://blog.csdn.net/u010839779/article/details/133089521