Spring Boot 整合 Apache Flink 教程
精心整理了最新的面试资料和简历模板,有需要的可以自行获取
点击前往百度网盘获取
点击前往夸克网盘获取
Spring Boot 整合 Apache Flink 教程
一、背景与目标
Apache Flink 是一个高性能的分布式流处理框架,而Spring Boot提供了快速构建企业级应用的能力。整合二者可实现:
- 利用Spring Boot的依赖注入、配置管理等功能简化Flink作业开发
- 构建完整的微服务架构,将流处理嵌入Spring生态
- 实现动态作业提交与管理
二、环境准备
- JDK 17+
- Maven 3.8+
- Spring Boot 3.1.5
- Flink 1.17.2
三、创建项目 & 添加依赖
1. 创建Spring Boot项目
使用Spring Initializr生成基础项目,选择:
- Maven
- Spring Web(可选,用于创建REST接口)
2. 添加Flink依赖
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Flink核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.2</version>
<scope>provided</scope>
</dependency>
<!-- 本地执行时需添加 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>1.17.2</version>
<scope>test</scope>
</dependency>
</dependencies>
四、基础整合示例
1. 编写Flink流处理作业
// src/main/java/com/example/demo/flink/WordCountJob.java
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCountJob {
public static void execute() throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(
"Spring Boot整合Flink",
"Flink实时流处理",
"Spring生态集成"
);
DataStream<WordCount> counts = text
.flatMap(new FlatMapFunction<String, WordCount>() {
@Override
public void flatMap(String value, Collector<WordCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordCount(word, 1L));
}
}
})
.keyBy(value -> value.word)
.sum("count");
counts.print();
env.execute("Spring Boot Flink Job");
}
public static class WordCount {
public String word;
public long count;
public WordCount() {}
public WordCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
2. 在Spring Boot中启动作业
// src/main/java/com/example/demo/DemoApplication.java
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
WordCountJob.execute(); // 启动Flink作业
}
}
五、进阶整合 - 通过REST API动态提交作业
1. 创建Job提交服务
// src/main/java/com/example/demo/service/FlinkJobService.java
@Service
public class FlinkJobService {
public String submitWordCountJob(List<String> inputLines) {
try {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromCollection(inputLines);
// ...(同上WordCount逻辑)
JobExecutionResult result = env.execute();
return "JobID: " + result.getJobID();
} catch (Exception e) {
return "Job Failed: " + e.getMessage();
}
}
}
2. 创建REST控制器
// src/main/java/com/example/demo/controller/JobController.java
@RestController
@RequestMapping("/jobs")
public class JobController {
@Autowired
private FlinkJobService flinkJobService;
@PostMapping("/wordcount")
public String submitWordCount(@RequestBody List<String> inputs) {
return flinkJobService.submitWordCountJob(inputs);
}
}
六、关键配置说明
1. application.properties
# 设置Flink本地执行环境
spring.flink.local.enabled=true
spring.flink.job.name=SpringBootFlinkJob
# 调整并行度(根据CPU核心数)
spring.flink.parallelism=4
2. 解决依赖冲突
在pom.xml中排除冲突依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.17.2</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
七、运行与验证
- 启动Spring Boot应用:
mvn spring-boot:run
- 调用API提交作业:
curl -X POST -H "Content-Type: application/json" \
-d '["Hello Flink", "Spring Boot Integration"]' \
http://localhost:8080/jobs/wordcount
- 查看控制台输出:
Flink> Spring : 1
Flink> Boot : 1
Flink> Integration : 1
...
八、生产环境注意事项
-
集群部署:将打包后的jar提交到Flink集群
flink run -c com.example.demo.DemoApplication your-application.jar
-
状态管理:集成Flink State Backend(如RocksDB)
-
监控集成:通过Micrometer接入Spring Boot Actuator
-
资源隔离:使用
Yarn
或Kubernetes
部署模式
九、完整项目结构
src/
├── main/
│ ├── java/
│ │ ├── com/example/demo/
│ │ │ ├── DemoApplication.java
│ │ │ ├── flink/
│ │ │ │ └── WordCountJob.java
│ │ │ ├── controller/
│ │ │ ├── service/
│ ├── resources/
│ │ └── application.properties
pom.xml
通过以上步骤,即可实现Spring Boot与Apache Flink的深度整合。这种架构特别适合需要将实时流处理能力嵌入微服务体系的场景,如实时风控系统、IoT数据处理平台等。后续可扩展集成Kafka、HBase等大数据组件。