flink的集成测试
背景
日常测试中我们使用flink的TestHarness只能测试单个算子,很多情况下我们需要集成测试来测试真正的问题,所以在flink中进行集成测试还是非常有必要的,本文就来记录下如何在flink中进行集成测试
flink中进行集成测试
flink中进行集成测试的关键类MiniClusterWithClientResource,这是一个启动本地flink集群的关键类,先看一下集成测试的关键代码:
/**
* FLINK集成测试
* https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/testing/
*
*/
public class FlinkIntegrationTest {
public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {
{
put("heartbeat.timeout", "300000");
}
});
@ClassRule
public static MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config)
.setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());
@Test
public void testStateFlatMap() throws Exception {
StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));
}
@Test
public void testStateFlatMap1() throws Exception {
StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));
}
// create a testing sink
private static class CollectSink implements SinkFunction<String> {
// must be static
public static final List<String> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(String value, Context context) throws Exception {
values.add(value);
}
}
}
public class StatefulFlatMap extends RichFlatMapFunction<String, String> {
ValueState<String> previousInput;
@Override
public void open(Configuration parameters) throws Exception {
previousInput = getRuntimeContext().getState(
new ValueStateDescriptor<String>("previousInput", Types.STRING));
}
@Override
public void flatMap(String in, Collector<String> collector) throws Exception {
String out = "hello " + in;
if(previousInput.value() != null){
out = out + " " + previousInput.value();
}
previousInput.update(in);
collector.collect(out);
}
由于我们是集成测试,我们一般输入source和输出sink是自己构造的,比如这里的CollectSink,这里就可以正常测试包括状态在内的pineline集成测试了