当前位置: 首页 > article >正文

flink学习(6)——自定义source和kafka

概述

SourceFunction:非并行数据源(并行度只能=1) --接口

RichSourceFunction:多功能非并行数据源(并行度只能=1) --类

ParallelSourceFunction:并行数据源(并行度能够>=1) --接口

RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】

——Rich 字样代表富有,在编程中,富有代表可以调用的方法很多,功能很全的意思。

 基础案例

package com.bigdata.day02;

//1、SourceFunction
// public class ZidingyiSource implements SourceFunction<Student> {
//2、RichSourceFunction
// public class ZidingyiSource extends RichSourceFunction<Student> {
//3、ParallelSourceFunction
//public class ZidingyiSource implements ParallelSourceFunction<Student> {
//4、RichParallelSourceFunction
//public class ZidingyiSource extends RichParallelSourceFunction<Student> {
// 推荐的
public class ZidingyiSource extends RichParallelSourceFunction<Student> {

    // ctrl + o
    private final Random random = new Random();
    private boolean flag = true;

    // 现在不用
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("实现一些资源的开启");
    }

    // 现在不用
    @Override
    public void close() throws Exception {
        System.out.println("实现一些资源的关闭");
    }


    @Override
    public void run(SourceContext<Student> sourceContext) throws Exception {
        while (flag){

            String stu_id = UUID.randomUUID().toString();
            String stu_name = "Student_"+stu_id;
            int stu_age = random.nextInt(8)+10;
            long stu_timestamp = System.currentTimeMillis();
            Student student = new Student(stu_id,stu_name,stu_age,stu_timestamp);
            sourceContext.collect(student);
            Thread.sleep(1000);
        }
    }

    // 具体什么时候 会调用还不知道
    @Override
    public void cancel() {
        flag = false;
        System.out.println("停止运行");
    }
}


//调用
public class ZiDingYi {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // add + new 
        DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());
        int parallelism = studentDataStreamSource.getParallelism();
        System.out.println(parallelism);
        // print之前与之后的并行度是不同的
        studentDataStreamSource.print().setParallelism(1);
        env.execute();
    }
}

cancel+open+close的调用时机

package com.bigdata.day02;

import java.util.Objects;

/*
* 1、这几个方法都会按照并行度调用多次 调度的次数 按照studentDataStreamSource的并行度
*
*/

public class ZiDingYi {
    public static void main(String[] args) throws Exception {
    // 在上面案例的基础上实现
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        DataStreamSource<Student> studentDataStreamSource = env.addSource(new ZidingyiSource());
        
        // 此时就只会调用一次了
        studentDataStreamSource.setParallelism(1);
        
        // 此时打印也会有多个并行度(8个cpu)
        studentDataStreamSource.print();
        

        // 异步调用 此时会调用open方法
        JobExecutionResult execute = env.execute();
        JobClient flink_job = env.executeAsync("Flink Job");
        Thread.sleep(3000);
        // 此时会调用 cancel 和 close 
        flink_job.cancel();
    }
}

 kafkaSource

package com.bigdata.day02;

import java.util.Properties;

public class KafkaSource {
    public static void main(String[] args) throws Exception{
        //env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // properties 
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        
        // consumer
        FlinkKafkaConsumer<String> consumer= new FlinkKafkaConsumer<String>("yhedu",new SimpleStringSchema(),properties);
        
        // source
        DataStreamSource<String> dataStreamSource = env.addSource(consumer);
        dataStreamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.contains("success");
            }
        }).print();

        env.execute();

    }
}


http://www.kler.cn/a/408160.html

相关文章:

  • D78【 python 接口自动化学习】- python基础之HTTP
  • 搜索二维矩阵
  • Java算法OJ(10)哈希表练习
  • 鸿蒙NEXT开发案例:随机数生成
  • git使用(二)
  • ElasticSearch7.x入门教程之集群安装(一)
  • CCF认证202406-02 | 矩阵重塑(其二)
  • 计算机网络socket编程(6)_TCP实网络编程现 Command_server
  • node报错:cb.apply is not a function
  • 附录 9A 计量经济学实验室#5
  • 二号交叉学科楼的英文表达是什么?No. 2 Interdisciplinary Research Building
  • 电子应用设计方案-22:智能门禁系统方案设计
  • React 表单Form 中的 useForm
  • Linux内核
  • 创建可重用React组件的实用指南
  • 算法模板2:位运算+离散化+区间合并
  • 【Qt流式布局改造支持任意位置插入和删除】
  • CoAP 协议介绍:特性、应用与优劣势
  • 大语言模型---RewardBench 介绍;RewardBench 的主要功能;适用场景
  • 使用Python编写一个简单的网页爬虫,从网站抓取标题和内容。
  • windows C#-异步编程模型(下)
  • 使用go实现流式输出
  • Mac 环境变量配置基础教程
  • 贪心算法 day07
  • 嵌入式学习-C嘎嘎-Day08
  • 第三百二十九节 Java网络教程 - Java网络UDP套接字