当前位置: 首页 > article >正文

Spring Boot 整合 Apache Flink 教程

精心整理了最新的面试资料和简历模板,有需要的可以自行获取

点击前往百度网盘获取
点击前往夸克网盘获取


Spring Boot 整合 Apache Flink 教程

一、背景与目标

Apache Flink 是一个高性能的分布式流处理框架,而Spring Boot提供了快速构建企业级应用的能力。整合二者可实现:

  1. 利用Spring Boot的依赖注入、配置管理等功能简化Flink作业开发
  2. 构建完整的微服务架构,将流处理嵌入Spring生态
  3. 实现动态作业提交与管理

二、环境准备

  • JDK 17+
  • Maven 3.8+
  • Spring Boot 3.1.5
  • Flink 1.17.2

三、创建项目 & 添加依赖

1. 创建Spring Boot项目

使用Spring Initializr生成基础项目,选择:

  • Maven
  • Spring Web(可选,用于创建REST接口)

2. 添加Flink依赖

<!-- pom.xml -->
<dependencies>
    <!-- Spring Boot Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <!-- Flink核心依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.17.2</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>1.17.2</version>
        <scope>provided</scope>
    </dependency>

    <!-- 本地执行时需添加 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime</artifactId>
        <version>1.17.2</version>
        <scope>test</scope>
    </dependency>
</dependencies>

四、基础整合示例

1. 编写Flink流处理作业

// src/main/java/com/example/demo/flink/WordCountJob.java

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountJob {

    public static void execute() throws Exception {
        final StreamExecutionEnvironment env = 
            StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.fromElements(
            "Spring Boot整合Flink",
            "Flink实时流处理",
            "Spring生态集成"
        );

        DataStream<WordCount> counts = text
            .flatMap(new FlatMapFunction<String, WordCount>() {
                @Override
                public void flatMap(String value, Collector<WordCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordCount(word, 1L));
                    }
                }
            })
            .keyBy(value -> value.word)
            .sum("count");

        counts.print();
        env.execute("Spring Boot Flink Job");
    }

    public static class WordCount {
        public String word;
        public long count;

        public WordCount() {}

        public WordCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

2. 在Spring Boot中启动作业

// src/main/java/com/example/demo/DemoApplication.java

@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        WordCountJob.execute(); // 启动Flink作业
    }
}

五、进阶整合 - 通过REST API动态提交作业

1. 创建Job提交服务

// src/main/java/com/example/demo/service/FlinkJobService.java

@Service
public class FlinkJobService {

    public String submitWordCountJob(List<String> inputLines) {
        try {
            final StreamExecutionEnvironment env = 
                StreamExecutionEnvironment.getExecutionEnvironment();
            
            DataStream<String> text = env.fromCollection(inputLines);
            
            // ...(同上WordCount逻辑)
            
            JobExecutionResult result = env.execute();
            return "JobID: " + result.getJobID();
        } catch (Exception e) {
            return "Job Failed: " + e.getMessage();
        }
    }
}

2. 创建REST控制器

// src/main/java/com/example/demo/controller/JobController.java

@RestController
@RequestMapping("/jobs")
public class JobController {

    @Autowired
    private FlinkJobService flinkJobService;

    @PostMapping("/wordcount")
    public String submitWordCount(@RequestBody List<String> inputs) {
        return flinkJobService.submitWordCountJob(inputs);
    }
}

六、关键配置说明

1. application.properties

# 设置Flink本地执行环境
spring.flink.local.enabled=true
spring.flink.job.name=SpringBootFlinkJob

# 调整并行度(根据CPU核心数)
spring.flink.parallelism=4

2. 解决依赖冲突

在pom.xml中排除冲突依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>1.17.2</version>
    <exclusions>
        <exclusion>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
        </exclusion>
    </exclusions>
</dependency>

七、运行与验证

  1. 启动Spring Boot应用:
mvn spring-boot:run
  1. 调用API提交作业:
curl -X POST -H "Content-Type: application/json" \
-d '["Hello Flink", "Spring Boot Integration"]' \
http://localhost:8080/jobs/wordcount
  1. 查看控制台输出:
Flink> Spring : 1
Flink> Boot : 1
Flink> Integration : 1
...

八、生产环境注意事项

  1. 集群部署:将打包后的jar提交到Flink集群

    flink run -c com.example.demo.DemoApplication your-application.jar
    
  2. 状态管理:集成Flink State Backend(如RocksDB)

  3. 监控集成:通过Micrometer接入Spring Boot Actuator

  4. 资源隔离:使用YarnKubernetes部署模式


九、完整项目结构

src/
├── main/
│   ├── java/
│   │   ├── com/example/demo/
│   │   │   ├── DemoApplication.java
│   │   │   ├── flink/
│   │   │   │   └── WordCountJob.java
│   │   │   ├── controller/
│   │   │   ├── service/
│   ├── resources/
│   │   └── application.properties
pom.xml

通过以上步骤,即可实现Spring Boot与Apache Flink的深度整合。这种架构特别适合需要将实时流处理能力嵌入微服务体系的场景,如实时风控系统、IoT数据处理平台等。后续可扩展集成Kafka、HBase等大数据组件。


http://www.kler.cn/a/598255.html

相关文章:

  • 【C++初阶】从零开始模拟实现vector(含迭代器失效详细讲解)
  • Linux常用命令与权限理解总结
  • Spring Boot(十七):集成和使用Redis
  • Java面试黄金宝典9
  • 在Linux、Windows系统上安装开源InfluxDB——InfluxDB OSS v2并设置开机自启的保姆级图文教程
  • 用selenium+ChromeDriver爬取知乎评论区(但要求登录以及反爬机制爬不到数据)
  • 【菜鸟飞】AI多模态:vsCode下python访问阿里云通义文生图API
  • 使用Python和PyTorch实现了一个简单的生成对抗网络(GAN)用于生成应力值图像
  • 正则表达式基本语法和Java中的简单使用
  • fastapi 实践(三)Swagger Docs
  • STM32基础教程——PWM驱动LED呼吸灯
  • AIGC 新势力:探秘海螺 AI 与蓝耘 MaaS 平台的协同创新之旅
  • 【Jwt】详解认证登录的数字签名
  • 牛客网【模板】二维差分(详解)c++
  • 【JavaEE】网络编程socket
  • Java学习路线(便于理解)
  • PostgreSQL_数据使用与日数据分享
  • C语言-访问者模式详解与实践
  • Enovia许可分析的自动化解决方案
  • 程序代码篇---Pyqt的密码界面