当前位置: 首页 > article >正文

Spring Boot集成Akka Stream快速入门Demo

1.什么是Akka Stream?

Akka Streams是一个用于处理和传输元素序列的库。它建立在Akka Actors之上,使流的摄入和处理变得简单。由于它是建立在Akka Actors之上的,它为Akka现有的actor模型提供了一个更高层次的抽象。Akka流由3个主要部分组成--Source、Flow、Sink--任何非循环流至少由2个部分Source、Sink和任意数量的Flow元素组成。这里我们可以说Source和Sink是Flow的特殊情况。这里Flow位于Source和Sink之间,因为它们是应用于Source数据的转换。

stream

Akka流的特点

  • Akka-streams对于快速流数据非常有用。
  • 它避免了管理角色所需的大量模板代码。
  • 它最适合于基于大数据的应用。
  • 由于它是建立在Akka工具包上的,我们将获得所有Akka工具包的好处,如反应性、分布式、位置透明性、集群、Remoting等。
  • 它提供了可重用性,这意味着一旦我们设计了数据流图,我们就可以重复使用它的任何次数。

2.代码工程

 实验目标

熟悉akka stream 相关概念

pom.xml

<!-- Akka Streams -->
<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.13</artifactId>
    <version>2.6.0</version>
</dependency>

config

package com.et.akka.config;

import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnClass(akka.stream.javadsl.Source.class)
public class AkkaConfig {

  private final ActorSystem system;

  @Autowired
  public AkkaConfig() {
    system = ActorSystem.create("SpringWebAkkaStreamsSystem");
  }

  @Bean
  @ConditionalOnMissingBean(ActorSystem.class)
  public ActorSystem getActorSystem() {
    return system;
  }

}

akka stream

1.源。

这是你的流的入口。每个流中必须至少有一个源。它需要两个类型参数。第一个代表它所发射的数据类型,第二个是它在运行时可以产生的辅助值的类型。如果不产生,我们就使用Akka提供的NotUsed 类型。它只有一个输出点。源可以被认为是发布者

Source<Integer, NotUsed> source = Source.range(1, 100);

2.Sink :

这是你的流的出口点。每个流中必须至少有一个水槽。Sink 是我们流的最后一个元素。基本上,它是一个由源发送/处理的数据的订阅者。通常它将其输入输出到一些系统IO。它是一个流的终点,因此消耗数据。一个汇有一个单一的输入通道,没有输出通道。当我们想以可重复使用的方式指定数据收集器的行为时,特别需要汇,而且不需要评估流。水槽可以被认为是用户。

Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);

3.流:

流是流中的一个处理步骤。它结合了一个传入通道和一个传出通道,以及通过它的消息的一些转换。如果一个流被连接到一个源,一个新的源就是结果。同样地,一个流连接到一个汇,就会产生一个新的汇。而同时与一个源和一个汇相连的流的结果是RunnableFlow 。因此,它们位于输入和输出通道之间,但只要它们不与源或汇相连,它们本身就不对应于其中一种味道。这里,流位于源和汇之间,因为它们是应用于源数据的转换。

Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).filter(MyStream::isPrime);

4.RunnableGraph :

一个两端分别连接到Source和Sink的Flow可以被运行(),被称为RunnableGraph。即使通过连接所有的源、汇和不同的操作符来构建RunnableGraph,也不会有数据流经它。这就是Materialization的作用!

 RunnableGraph<NotUsed> graph = source.to(sink);
 graph.run(actorSystem);

5.Materializer :

Akka流中的流和图就像准备一个蓝图/执行计划。流的物化是将流的描述和分配它所需的所有必要资源的过程,以便运行。这意味着启动处理的Actor,以及根据流的需要,在引擎盖下的更多内容。在运行(物化)RunnableGraph后,我们会得到指定类型的物化值。每个流操作者都可以产生一个物化的值。Akka有.toMat ,以表明我们要转换源和汇的物化值。

source.via(flow).to(sink).run(materializer);

具体类的信息如下:

package com.et.akka.stream;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import javax.annotation.PostConstruct;
import java.util.concurrent.CompletionStage;

@Component
public class MyStream {
    @Autowired
    private ActorSystem actorSystem;
    @PostConstruct
    public void run() {
        Source<Integer, NotUsed> source = Source.range(1, 10);
        Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(System.out::println);

        RunnableGraph<NotUsed> graph = source.to(sink);

        graph.run(actorSystem);
    }
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(akka)

3.测试

  • 启动Spring Boot工程
  • 控制台输出所有质数

4.引用

  • https://doc.akka.io/docs/akka/current/stream/stream-quickstart.html
  • Spring Boot集成Akka Stream快速入门Demo | Harries Blog™

http://www.kler.cn/a/303252.html

相关文章:

  • ecmascript:2.模版字符串
  • 【C++】B2112 石头剪子布
  • Linux之进程
  • BI 是如何数据分析的?
  • 在Linux系统中无网络安装Nginx并配置负载均衡
  • 我国无人机新增实名登记110.3 万架,累计完成飞行2666万小时
  • c++stack和list 介绍
  • 20. 如何在MyBatis中处理多表关联查询?常见的实现方式有哪些?
  • 数据分析-26-时间序列预测之基于ARIMA的时间序列数据分析
  • k8s命名详解
  • Redis地理数据类型GEO
  • 通信工程学习:什么是FDMA频分多址
  • Games101笔记-线性代数(一)
  • WORD批量转换器MultiDoc Converter
  • 第 11篇 Helm 部署 RabbitMQ
  • flink的大状态复用
  • C++——一道关于多态的经典面试题
  • 宠物空气净化器应该怎么选择才能选到除毛效果好的产品
  • mysql-搭建主从复制
  • pdf怎么压缩?分享5种压缩PDF文件的方法
  • 《CSS新世界》书评
  • 使用程序集解析的方式内嵌dll到exe中
  • #名词区别篇:npx pnpm npm yarn区别
  • gitlab无法push(pre-receive hook declined)
  • 如何使用 Choreographer 进行帧率优化
  • 旅游网站开发:SpringBoot框架实战