当前位置: 首页 > article >正文

Flink的环境搭建及使用

在idea中创建一个Maven项目,导入Flink的依赖,在代码中创建Flink环境,编写代码.

如果不想去找flink依赖,就去flink官网,提供了一个mvn的命令,快速下载在本地构建一个flink的项目,可以直接从这个项目的pom.xml文件中拿到依赖配置

一、环境搭建

pom.xml文件的依赖导入

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.15.4</flink.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>


二、使用Flink

以WordCount为例:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Demo1WordCount {

    public static void main(String[] args) throws Exception {
        //1、创建flink的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置并行度,一个并行度对应一个task
        env.setParallelism(2);

        //修改数据从上游发送到下游的缓存时间
        env.setBufferTimeout(2000);


        /*
         * 无界流
         */
        //2、读取数据
        //nc -lk 8888
        DataStream<String> linesDS = env.socketTextStream("master", 8888);


        //一行转换成多行
        DataStream<String> wordsDS = linesDS
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String line, Collector<String> out) throws Exception {
                        for (String word : line.split(",")) {
                            //将数据发送到下游
                            out.collect(word);
                        }
                    }
                });

        //转换成kv格式
        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String word) throws Exception {
                        //返回一个二元组
                        return Tuple2.of(word, 1);
                    }
                });

        //按照单词进行分组
        //底层是hash分区
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> kv) throws Exception {
                        return kv.f0;
                    }
                });

        //统计数量
        DataStream<Tuple2<String, Integer>> countDS = keyByDS
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> kv1,
                                                          Tuple2<String, Integer> kv2) throws Exception {
                        int count = kv1.f1 + kv2.f1;
                        return Tuple2.of(kv1.f0, count);
                    }
                });

        //打印结果
        countDS.print();

        //3、启动flink
        env.execute("wc");
    }
}


http://www.kler.cn/a/381512.html

相关文章:

  • 【智能算法改进】混沌映射策略--一网打尽
  • 产品经理笔记
  • JavaScript数据类型- Symbol 详解
  • SpringBoot+ClickHouse集成
  • Linux中的软硬链接文件详解
  • Logstash 安装与部署(无坑版)
  • 基于java+SpringBoot+Vue的旅游管理系统设计与实现
  • android 怎么查看依赖包的大小
  • 【含文档】基于ssm+jsp的图书管理系统(含源码+数据库+lw)
  • Spring 框架中常见的注解(Spring、SpringMVC、SpringBoot)
  • Spleeter:音频分离的革命性工具
  • SQL实战训练之,力扣:1843. 可疑银行账户
  • Centos7 搭建 Java Web 开发环境 源码安装redis
  • 使用 ABAP GIT 发生 IF_APACK_MANIFEST dump
  • 原厂出具的校准报告有效吗?是否还要找第三方校准?
  • Vue3-子传父
  • Linux:网络协议socket
  • JAVA学习-练习试用Java实现“斐波那契数列”
  • 【Nginx系列】Nginx 中的`proxy_set_header`指令:Host 字段的区别与联系
  • 免费工单管理利器 优选系统 助力团队协作
  • 论文翻译 | Legal Prompt Engineering for Multilingual Legal Judgement Prediction
  • C++游戏开发:构建高性能游戏世界的基石
  • 【青牛科技】GC6129:摇头机中替代 MS41929 的理想之选
  • FPGA实现以太网(一)、以太网基础知识
  • leecode 44. 通配符匹配
  • 重学Android:自定义View基础(一)