当前位置: 首页 > article >正文

Flink中自定义Source和Sink的使用

只要自定一个Source类实现SourceFunction接口,一个Sink类实现SinkFunction接口,就能正常使用自定义的Source和Sink,或者直接extends继承RichSourceFunction和RichSinkFunction,RichSinkFunction:多个open和close方法

1、自定义Source

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class Demo3SourceFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用自定义source
        DataStream<Integer> myDS = env.addSource(new MySource());
        myDS.print();

        env.execute();
    }
}

//自定义source
//实现SourceFunction接口
class MySource implements SourceFunction<Integer> {
    //在run方法中读取外部的数据,使用原生java代码
    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        while (true) {
            ctx.collect(1);
            Thread.sleep(1000);
        }
    }

    //cancel方法是任务被取消是执行的,用于回收资源
    @Override
    public void cancel() {
    }
}

2、自定义Sink

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class Demo2MySink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);
        linesDS.addSink(new MySink());

        env.execute();
    }
}

//自定义Sink
class MySink implements SinkFunction<String> {
    //每一条数据执行一次
    @Override
    public void invoke(String value, Context context) throws Exception {
        System.out.println("mySink:" + value);
    }
}


http://www.kler.cn/a/395395.html

相关文章:

  • [DB]
  • Redis知识点整理 - 脑图
  • BILSTM法律网站用户提问自动分类
  • 探索MoviePy:Python视频编辑的瑞士军刀
  • Nuxt.js 应用中的 schema:beforeWrite 事件钩子详解
  • uniapp在app模式下组件传值
  • LeetCode297.二叉树的序列化和反序列化
  • 计算机网络前三章计算题总结
  • C++基础:Pimpl设计模式的实现
  • 【Pikachu】目录遍历实战
  • 论文解析:计算能力资源的可信共享:利益驱动的异构网络服务提供机制
  • 群控系统服务端开发模式-应用开发-前端角色功能开发
  • 解决Oracle DECODE函数字符串截断问题的深度剖析20241113
  • Ubuntu相关指令
  • 数据结构Python版
  • sqoop import将Oracle数据加载至hive,数据量变少,只能导入一个mapper的数据量
  • 【GPTs】MJ Prompt Creator:轻松生成创意Midjourney提示词
  • 【Git从入门到精通】——Git分支介绍与GitHub相关知识总结
  • Spring Boot与工程认证:计算机课程管理的新纪元
  • Spring Boot框架:电商系统的设计与实现
  • 037 RabbitMQ集群
  • 【Linux】多线程(中)
  • 电子电气架构 --- 基于以太网的电子电气架构概述
  • 大模型在蓝鲸运维体系应用——蓝鲸运维开发智能助手
  • 文心一言 VS 讯飞星火 VS chatgpt (389)-- 算法导论25.1 2题
  • 【Qt聊天室客户端】消息功能--发布程序