当前位置: 首页 > article >正文

flink的集成测试

背景

日常测试中我们使用flink的TestHarness只能测试单个算子,很多情况下我们需要集成测试来测试真正的问题,所以在flink中进行集成测试还是非常有必要的,本文就来记录下如何在flink中进行集成测试

flink中进行集成测试

flink中进行集成测试的关键类MiniClusterWithClientResource,这是一个启动本地flink集群的关键类,先看一下集成测试的关键代码:

/**
 * FLINK集成测试
 * https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/testing/
 *
 */
public class FlinkIntegrationTest {

    public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {
        {
            put("heartbeat.timeout", "300000");
        }
    });

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config)
                    .setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());

    @Test
    public void testStateFlatMap() throws Exception {
        StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));
    }

    @Test
    public void testStateFlatMap1() throws Exception {
        StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));
    }



    // create a testing sink
    private static class CollectSink implements SinkFunction<String> {

        // must be static
        public static final List<String> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(String value, Context context) throws Exception {
            values.add(value);
        }
    }


}

public class StatefulFlatMap extends RichFlatMapFunction<String, String> {

    ValueState<String> previousInput;

    @Override
    public void open(Configuration parameters) throws Exception {
        previousInput = getRuntimeContext().getState(
                new ValueStateDescriptor<String>("previousInput", Types.STRING));
    }

    @Override
    public void flatMap(String in, Collector<String> collector) throws Exception {
        String out = "hello " + in;
        if(previousInput.value() != null){
            out = out + " " + previousInput.value();
        }
        previousInput.update(in);
        collector.collect(out);
    }

由于我们是集成测试,我们一般输入source和输出sink是自己构造的,比如这里的CollectSink,这里就可以正常测试包括状态在内的pineline集成测试了


http://www.kler.cn/a/145524.html

相关文章:

  • sentinel微服务保护
  • CMake技术细节:解决未定义,提供参数
  • Django 的 `Meta` 类和外键的使用
  • 基于 Spring Boot 和 Vue.js 的全栈购物平台开发实践
  • C语言程序设计十大排序—选择排序
  • python进程池、线程池
  • 通过ros系统中websocket中发送sensor_msgs::Image数据给web端显示(二)
  • 视频号小店入驻需要多少资金?入驻费用详解!
  • 【领域驱动设计 学习目标及大纲】从CRUD到架构设计
  • YUV颜色空间与RGB的转换
  • 使用 Kafka 和 Cassandra 构建实时异常检测实验
  • 这是一张单纯的图片-MISC-bugku-解题步骤
  • 【JavaWeb】Servlet
  • Arduino驱动温湿度气压光照传感器模块
  • C语言 - 语句
  • leetcode刷题详解九
  • [递归回溯] 八皇后问题
  • 基于Spring、SpringMVC、MyBatis的闪烁物业管理系统
  • Hugging Face宣布最受欢迎的AI机构,开源模型ChatGLM-6B广受认可
  • 操作系统——操作系统概论s
  • WIFI模块(esp-01s)获取网络时间代码实现
  • vue+elementui如何实现在表格中点击按钮预览图片?
  • Vue2中的两种普通注册方式
  • 1.用数组输出0-9
  • 异步组件与函数式组件
  • 【GPT-3.5】通过python调用ChatGPT API与ChatGPT对话交流