当前位置: 首页 > article >正文

Spring WebFlux 高级实战(3-3)

1、SpringWebFlux 数据库访问

1.1、响应式持久化库的工作原理

        Spring Data 中的响应式存储库通过适配底层数据库驱动来工作。

  • ReactiveMongoRepository:继承了 ReactiveSortingRepository 和 ReactiveQueryByExampleExecutor 等更多通用接口。
  • ReactiveQueryByExampleExecutor:接口可以使用 QBE 语言执行查询。
  • ReactiveSortingRepository:接口扩展了 ReactiveCrudRepository 接口;并添加了 findAll 方法,该方法能对请求查询结果进行排序。
  • ReactiveCrudRepository:声明了用于保存、查找和删除实体的方法。
    • Mono<T> save(T entity) 方法保存 entity ,然后返回所保存的实体。保存操作可能更改整个实体对象。
    • Mono<T> findById(ID id) 操作实体的 id 并返回包装在 Mono 中的结果。
    • findAllById 方法有两个重载方法,其中一个重载方法以 Iterable<ID>集合的形式消费 ID,另一个则采用Publisher<ID>的形式。
    • ReactiveCrudRepository 和 CrudRepository 之间唯一值得注意的区别在于 ReactiveCrudRepository 没有分页且不能进行事务操作

1.1.1、分页支持

        Spring Data 故意省略分页,因为同步存储库中使用的实现方案不适合响应式。主要原因是:

  1. 要计算下一页的参数,需要知道前一个结果的返回记录数。
  2. 要计算分页总数,需要查询记录总数。

        这两个方面都不符合响应式非阻塞范式。另外,通过查询数据库计算总行数不仅相当消耗资源,还在实际数据处理之前增加了延迟。但是,通过将 Pageable 对象传递到存储库,仍然可以获取数据块,如下所示:

pubic interface ReactiveBookRepository extends ReactiveSortingRepository<Book, Long> {
	Flux<Book> findByAuthor(String author, Pageable pageable);
}

        所以,现在请求结果的第二页(索引从 0 开始),其中每页包含 5 个元素:

Flux<Book> result = reactiveBookRepository.findByAuthor("Andy Weir",PageRequest.of(1, 5));

1.1.2、ReactiveMongoRepository 实现细节

        Spring Data MongoDB Reactive 模块只有一个针对 ReactiveMongoRepository 接口的实现,即 SimpleReactiveMongoRepository 类:

  1. 它为 ReactiveMongoRepository 的所有方法提供实现;
  2. 并使用 ReactiveMongoOperations 接口处理所有较低级别的操作。

        如 findAllById(Publisher<ID> ids) 方法的实现:

        此方法使用 buffer 操作收集所有 ids,然后使用 findAllById(Iterable <ID> ids) 方法创建一个请求。该方法构建 Query 对象并调用 findAll(Query query) 方法,该方法触发 ReactiveMongoOperations 实例的 mongoOperations.find(query,...) 。

        insert(Iterable<S> entities) 方法在一个批处理中插入实体。

        insert(Publisher<S> entities) 方法在 flatMap 操作符内生成许多查询。

        但是 findAllById 方法的两个重载方法以相同的方式运行,并且只生成一个数据库查询。saveAll 方法:该方法消费 Publisher 的重载会为每个实体发出查询。

        而其消费 Iterable 的重载,在所有实体都是新的的情况下只发出一个查询,但在其他情况下会为每个实体都发出一个查询。

        deleteAll(Iterable<?extends T> entities) 方法总是为每个实体发出一个查询,即使所有实体在 Iterable 容器中都可用并且不需要等待元素异步显示也是如此。

        如果将 ReactiveCrudRepository 方法与实时生成的实现一起使用,查看实际查询会更加困难。在这种情况下,查询生成的行为方式与普通的同步 CrudRepository 类似。

        RepositoryFactorySupport 为 ReactiveCrudRepository 生成适当的代理。当使用 @Query 注解修饰方法时, ReactiveStringBasedMongoQuery 类用于生成查询。ReactivePartTreeMongoQuery 类用于基于方法名称约定的查询生成。将 ReactiveMongoTemplate 的日志级别设置为 DEBUG 时,可以跟踪发送到 MongoDB 的所有查询。

1.1.3、使用ReactiveMongoTemplate

        即使 ReactiveMongoTemplate 被用作响应式存储库的构建块,该类本身也非常通用。有时它能比高级别的存储库更高效地使用数据库。实现一个简单的服务,该服务使用 ReactiveMongoTemplate 并基于正则表达式来按标题查找图书:

import lombok.RequiredArgsConstructor;
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;

@Service
@RequiredArgsConstructor
public class RxMongoTemplateQueryService {
	private static final String BOOK_COLLECTION = "book";
  	// ReactiveMongoTemplate实现该接口,并在配置了 MongoDB 数据源后出现在 Spring 上下文中。
	private final ReactiveMongoOperations mongoOperations;

  	// 使用正则表达式作为搜索条件,并返回包含结果的 Flux
	public Flux<Book> findBooksByTitle(String title) {
		// Query 类和 Criteria 类使用正则表达式构建实际查询
		Query query = Query.query(new Criteria("title")
						.regex(".*" + title + ".*"))
						.limit(100); // 将结果数限制为 100
						// mongoOperations 执行前面构建的查询
						// 查询结果转换为Book.class类型的实体;查询名为 book 的集合
		return mongoOperations.find(query, Book.class, BOOK_COLLECTION);
	}
}

        注意,可以通过提供以下方法签名来实现与普通响应式存储库相同的行为(查询限制除外),该签名遵循如下命名约定: Flux<Book> findManyByTitleRegex(String regex);

        在底层, ReactiveMongoTemplate 使用 ReactiveMongoDatabaseFactory 接口来获取响应式 MongoDB 连接的实例。它使用 MongoConverter 接口的实例将实体转换为文档,反之亦然。MongoConverter 也适用于同步 MongoTemplate 。

        ReactiveMongoTemplate 实现其契约的方式:例如, find(Query query,...) 方法将 org.springframework.data.mongodb.core.query.Query 实例映射到 org.bson.Document 类的实例,MongoDB客户端使用后者工作。

        ReactiveMongoTemplate 使用转换后的查询调用数据库客户端。com.mongodb.reactivestreams.client.MongoClient 类提供了响应式 MongoDB驱动程序的入口点,符合响应式流并通过响应式 Publisher 返回数据。

1.1.4、使用响应式驱动程序(MongoDB)

        Spring Data 中的响应式 MongoDB 连接基于 MongoDB 响应式流 Java 驱动程序构建。该驱动程序提供具有非阻塞背压的异步流处理。

        此外,响应式驱动程序构建在 MongoDB 异步 Java驱动程序之上。异步驱动程序是低级别的,并且具有基于回调的 API,因此它不像较高级别的响应式流驱动程序那样易于使用。除了 MongoDB 响应式流 Java 驱动程序,还有 MongoDB RxJava 驱动程序,而后者基于同一个异步 MongoDB 驱动程序。

        因此,针对MongoDB 连接,Java 生态系统准备了一个同步驱动程序、一个异步驱动程序和两个响应式驱动程序。如果对查询过程控制的需求超过 ReactiveMongoTemplate 所能提供的,可以直接使用响应式驱动程序。通过这种方法,在上述示例中使用纯响应式驱动程序的结果如下:

public class RxMongoDriverQueryService {
	private final MongoClient mongoClient;

  	public Flux<Book> findBooksByTitleRegex(String regex) {
		return Flux.defer(() -> {
					Bson query = Filters.regex(titleRegex);
					return mongoClient
							.getDatabase("test-database")
							.getCollection("book")
							.find(query);
				})
				.map(doc -> new Book(
					doc.getObjectId("id"),
					doc.getString("title"),
					doc.getInteger("pubYear"),
					// ……其他映射程序
				));
	}
}

        返回一个新的 Flux 实例,它将执行过程推迟到实际订阅发生的时间。在 lambda中,使用 com.mongodb.client.model.Filters 辅助类并基于org.bson.conversions.Bson类型定义一个新查询。

        尽管前面的示例中在数据库驱动程序级别进行工作,但在不需要手动处理背压,因为 MongoDB 响应式流 Java 驱动程序已经支持背压处理。
        响应式 MongoDB 连接使用基于批大小的背压需求。虽然这种方法是一种合理的默认设置,但在使用小的需求增量时会生成许多往返。下图强调了响应式 MongoDB 存储库所需的所有抽象层。

1.2、响应式事务

        对于同步处理的情况,事务对象通常保存在 ThreadLocal 容器中。但是 ThreadLocal 不适合用于响应式处理方式,因为用户无法控制线程切换。事务需要将底层资源绑定到物化数据流。在 Project Reactor 中,可以通过 Reactor 上下文来实现这一目标。

1.2.1、基于 MongoDB 4 的响应式事务

        MongoDB 从 4.0 版开始支持多文档事务(multi-document transactions)。Spring Data 没有任何在服务或存储库级别应用响应式事务的功能。但是,可以使用 ReactiveMongoOperations 级别(由 ReactiveMongoTemplate 实现)的事务进行操作。

        首先,多文档事务是 MongoDB 的一项新功能。它仅适用于使用 WiredTiger 存储引擎的非分片副本集。在 MongoDB 4.0 中,没有其他配置支持多文档事务。

        其次,某些 MongoDB 功能在事务中不可用,如,发出元命令和创建集合或索引都是不可能的。同时,隐式创建集合在事务中不起作用。因此,需要设置所需的数据库结构以防止错误。此外,某些命令的行为可能有所不同。在事务提交之前,事务外部不会显示任何数据更新。

        假设必须实现一种用于在用户账户之间转账的钱包服务。每个用户都有自己的账户,且账户余额非负。用户可以将任意金额转给其他用户,但只有在账户中有足够资金时转账才会成功。转账可以并行发生,但在转账时,系统中的资金既不能增加,也不能减少。因此,汇款人钱包的取款操作和收款人钱包的存款操作必须同时且原子地进行。此时可以使用多文档事务。要将一笔款项从账户 A 转账到账户 B,应该执行以下操作:

  1. 启动新事务。
  2. 加载账户 A 的钱包
  3. 加载账户 B 的钱包
  4. 检查账户 A 的钱包中是否有足够的资金
  5. 提取转账金额并计算账户 A 的新余额
  6. 存入转账金额并计算账户 B 的新余额
  7. 保存账户 A 的钱包
  8. 保存账户 B 的钱包
  9. 提交事务

        准备 MongoDB 数据库环境:

主节点配置:mongo_37017.conf

dbpath=/data/mongo/data/server1
bind_ip=0.0.0.0
port=37017
fork=true
logpath=/data/mongo/logs/server1.log
replSet=blnpCluster

从节点1配置:mongo_37018.conf

dbpath=/data/mongo/data/server2
bind_ip=0.0.0.0
port=37018
fork=true
logpath=/data/mongo/logs/server2.log
replSet=blnpCluster
从节点 2 配置: mongo_37019.conf
dbpath=/data/mongo/data/server3
bind_ip=0.0.0.0
port=37019
fork=true
logpath=/data/mongo/logs/server3.log
replSet=lagouCluster

步骤一:创建目录

mkdir -p /data/mongo/data/server1 /data/mongo/data/server2 /data/mongo/data/server3 /data/mongo/logs

步骤二:启动服务

mongod -f 37017.conf
mongod -f 37018.conf
mongod -f 37019.conf

步骤三:查看进程和端口号

ps aux | grep mongo
ss -nelp | grep mongo

步骤四:初始化节点配置

var cfg = {"_id":"lagouCluster", "protocolVersion":1, "members":[
    {"_id":1, "host":"node2:37017", "priority":10},
    {"_id":2, "host":"node2:37018"},
    {"_id":3, "host":"node2:37019"}
]}

rs.initiate(cfg)
rs.status()

# 启用slave节点的读
rs.slaveOk()
# 查看状态
rs.status()
# 增加节点
rs.add("node2:37019")
# 删除节点
rs.remove("node2:37019")

节点说明:

  • PRIMARY 节点:可以查询和新增数据
  • SECONDARY节点:只能查询,不能新增,基于priority权重可以被选为主节点
  • ARBITER节点:不能查询数据和新增数据,不能变为主节点

创建 SpringBoot 项目:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>blnp.net.cn</groupId>
    <artifactId>mongodb-webflux-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>mongodb-webflux-demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>2.4.1</spring-boot.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1-jre</version>
        </dependency>
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>testcontainers</artifactId>
            <version>1.15.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

项目示例源码

        测试用例执行后,最终结果大致如下所示:

        源码示例里有用到 Reactor 上下文的使用,即 ReactiveMongoTemplate.inTransaction 方法启动一个新事务并将其放入上下文中。

        因此,在响应流中的任何位置都可以获得用 com.mongodb.reactivestreams.client.ClientSession 接口表示的事务的会话。ReactiveMongoContext.getSession() 辅助方法可以帮助获取会话实例。

        在上述模拟中。从日志中可以明显看出,系统保持了总余额不变,即模拟前后系统中的总金额是相同的。因此,我们通过应用 MongoDB 的响应式事务来实现并发资金转账中的系统完整性(这里跑完显示都是 TX_CONFLICT 状态完成,是因为库实例只使用了一个)。

MongoDB主从多实例:

        可以看到 发生 1w 次转账操作中,在主从实例下 成功了 3565次;金额不足失败的 3099次;发生异常的 3336次且总金额不变。

拓展:MongoDB主从部署

        docker-compose 部署脚本: 

version: "3.5"
services:
  mongodb-primary:
    image: mongo:5.0
    command: --bind_ip_all --replSet mongo-replica
    networks:
      mongodb_network:
        aliases: 
          - mongo-1
    volumes:
      - ./data/mongodb1:/data/modelDb
    ports:
      - 27017:27017

  mongodb-secondary-1:
    image: mongo:5.0
    command: --bind_ip_all --replSet mongo-replica
    networks:
      mongodb_network:
        aliases: 
          - mongo-2
    volumes:
      - ./data/mongodb2:/data/modelDb
    ports:
      - 27018:27017

  mongodb-secondary-2:
    image: mongo:5.0
    command: --bind_ip_all --replSet mongo-replica
    networks:
      mongodb_network:
        aliases: 
          - mongo-3
    volumes:
      - ./data/mongodb3:/data/modelDb
    ports:
      - 27019:27017

networks:
  mongodb_network:

        SpringBoot 配置:

server:
  port: 8084

spring:
  data:
    mongodb:
      ##单个实例配置
#      host: localhost
#      port: 27017
#      database: "wallet"
#      username: "admin"
#      password: "123456"
#      authentication-database: "admin"
      ##主从多实例配置
      uri: "mongodb://admin:admin@10.1.77.3:27017,10.1.77.3:27018,10.1.77.3:27019/wallet"

logging:
  level:
    reactor.core.publisher.FluxUsingWhen: ERROR
#    org.springframework.data.mongodb.core.ReactiveMongoTemplate: DEBUG

        当容器部署成功后,登录到主节点中。进行副本集初始化,把其它两个从节点加入到集群中。进入主节点后执行以下脚本:

mongo --host mongo-1:27017 <<EOF
 var cfg = {
   "_id": "mongo-replica",
   "version": 1,
   "members": [
     {
       "_id": 0,
       "host": "mongo-1:27017",
       "priority": 2
     },
     {
       "_id": 1,
       "host": "mongo-2:27017",
       "priority": 1
     },
     {
       "_id": 2,
       "host": "mongo-3:27017",
       "priority": 0
     }
   ]
 };
 rs.initiate(cfg, { force: true });
 rs.reconfig(cfg, { force: true });
 rs.secondaryOk();
 
 rs.status();
 
 db.getMongo().setReadPref('nearest');
 db.getMongo().setSecondaryOk();
EOF

        执行完成后,创建需要使用的数据库以及用户。执行如下脚本:

#登录数据库
mongo --host mongo-1:27017

#查看所有数据库
show dbs

#创建用户
db.createUser({user:"admin",pwd:"admin",roles:[{"role":"userAdminAnyDatabase","db":"admin"},{"role":"readWrite","db":"testdb"}]})

        或者使用 Navicat 连上数据库进行手动创建库与用户:

        此外,需要留意在一台机器部署时需要对数据库容器的别名进行映射。如下host文件配置:

1.2.2、基于 SAGA 模式的分布式事务

        分布式事务可以以不同方式实现。当然,对于使用响应式范式实现的持久层,这种说法也成立。但是,鉴于 Spring Data 仅支持 MongoDB 4 的响应式事务,并且前面提到的事务支持与 Java 事务 API(Java Transaction API,JTA)不兼容,在响应式微服务中实现分布式事务的唯一可行选择是 SAGA 模式。此外,与其他需要分布式事务的可选模式相比,SAGA 模式具有良好的可伸缩性,更适合响应式流。

1.3、SpringData 响应式连接器

        Spring Data为 4 个 NoSQL 数据库准备了数据库连接器,即 MongoDB、Cassandra、Couchbase 和 Redis。Spring Data 也可能支持其他数据存储,特别是那些利用 Spring WebFlux WebClient 基于 HTTP 进行通信的数据存储。

1.3.1、响应式 MongoDB 连接器

        可以使用 spring-boot-starter-data-mongodb-reactive Spring Boot 启动器模块启用 Spring Data Reactive MongoDB模块。响应式 MongoDB 支持提供了一个响应式存储库和 ReactiveMongoRepository 接口定义基本存储库契约。

        存储库继承了 ReactiveCrudRepository 的所有功能,并添加了对 QBE的支持。MongoDB 存储库支持使用@Query 注解的自定义查询以及带有@Meta 注解的其他查询配置。如果 MongoDB 存储库遵循命名约定,则它支持从方法名称生成查询。

        MongoDB 存储库的另一个显著特性是支持尾游标(tailable cursor)。默认情况下,数据库会在消费了所有结果时自动关闭查询游标。但是,MongoDB 有固定集合(capped collections),这些集合大小固定,支持高吞吐量操作。文档检索基于插入顺序。固定集合的工作方式与循环缓冲区类似。

        固定集合也支持一个尾游标。客户端消费初始查询的所有结果后,此光标保持打开状态,当有人将新文档插入到固定集合中时,尾游标将返回新文档。在 ReactiveMongoRepository 中,由 @Tailable 注解标记的方法会返回由 Flux<Entity> 类型表示的尾游标。

        ReactiveMongoOperations 接口和它的实现类 ReactiveMongoTemplate 级别更低,可以更精细地访问 MongoDB 通信。除此之外,ReactiveMongoTemplate 还支持 MongoDB 的多文档事务。此功能仅适用于WiredTiger 存储引擎的非分片副本集。

        响应式 Spring Data MongoDB 模块构建于响应式流 MongoDB 驱动程序之上,后者实现了响应式规范并在内部使用 Project Reactor。MongoDB 响应式流 Java 驱动程序基于 MongoDB异步 Java 驱动程序构建。

1.3.2、响应式 Redis 连接器

        Spring Data Reactive Redis 模块可以通过导入 spring-boot-starter-data-redis-reactive 启动器来启用。Redis 连接器不提供响应式存储库。

  • ReactiveRedisTemplate 类成为响应式 Redis 数据访问的核心抽象。
  • ReactiveRedisTemplate 实现 ReactiveRedisOperations 接口,并提供所有必需的序列化/反序列化过程。
  • ReactiveRedisConnection 能在与 Redis 通信时使用原始字节缓冲区。
  • ReactiveRedisTemplate 还能订阅 Pub-Sub通道。例如,convertAndSend(String destination, V message)方法将给定消息发布到给定通道,并返回接收消息的客户端数。
  • listenToChannel(String...channels)方法返回一个 Flux,其中包含来自感兴趣通道的消息。

        因此,响应式 Redis 连接器不仅可以实现响应式数据存储,还可以提供消息传递机制。Spring Data Redis 目前集成了 Lettuce 驱动程序。它是 Redis 唯一的响应式 Java 连接器。Lettuce 4.x 版本使用 RxJava 进行底层实现。但是,该库的 5.x 分支切换到了 Project Reactor。

        除 Couchbase 外的所有响应式连接器都具有响应式健康指标。因此,数据库运行状况检查也不应浪费任何服务器资源。

1.4、响应式关系型数据库连接

        响应式关系型数据库连接(Reactive Relational Database Connectivity,R2DBC)是一项探索完全响应式数据库 API 的倡议。R2DBC 在 Spring OnePlatform 2018 会议上被公开,其目标是定义具有背压支持的响应式数据库访问 API。Spring Data 团队在响应式 NoSQL 持久化方面获得了一些先进经验,因此决定提出对真正响应式语言级数据访问 API的愿景。R2DBC 项目包括以下部分:

  • R2DBC 服务提供程序接口(Service Provider Interface,SPI)定义了实现驱动程序的简约 API,便于彻底减少驱动程序实现者必须遵守的 API。SPI 不适合在应用程序代码中直接使用,需要专用的客户端库。
  • R2DBC 客户端提供了人性化的 API 和帮助类,可将用户请求转换为 SPI 级别。R2DBC 客户端对R2DBC SPI 的作用与 Jdbi 库对 JDBC 的作用相同。
  • R2DBC PostgreSQL 实现为 PostgreSQL 提供了 R2DBC 驱动程序。使用 Netty 框架通过 PostgreSQL 连接协议进行异步通信。背压既可以通过 TCP 流控制,也可以通过被称为门户(portal)的 PostgreSQL 特性来实现,后者实际上是一个查询内的光标。门户能完美地转换为响应式流。

1.4.1、基于 Spring Data R2DBC 使用 R2DBC

        Spring Data JDBC 模块中提供了基于R2DBC的 ReactiveCrudRepository 接口。SimpleR2dbcRepository 类使用R2DBC 实现 ReactiveCrudRepository 接口。SimpleR2dbcRepository 类不使用默认的 R2DBC 客户端,而是定义自己的客户端以使用 R2DBC SPI。背压问题在 R2DBC SPI 级别得到了解决。

1.4.2、使用R2DBC操作PostgreSQL案例

1、搭建PostgreSQL数据库

rpm安装部署:

# 安装postgre的共享库
sudo rpm -ivh postgresql13-libs-13.1-1PGDG.rhel7.x86_64.rpm
# 安装依赖
sudo yum install libicu -y
# 安装postgre客户端
sudo rpm -ivh postgresql13-13.1-1PGDG.rhel7.x86_64.rpm
# 安装postgre的服务端
sudo rpm -ivh postgresql13-server-13.1-1PGDG.rhel7.x86_64.rpm

# 修改postgres用户的密码
# 指定postgres的数据目录
# 在postgres家目录创建data目录
-bash-4.2$ mkdir data
# /var/lib/pgsql/data
# pg用户操作:初始化数据库
bash-4.2$ /usr/pgsql-13/bin/initdb -D /var/lib/pgsql/data
-bash-4.2$ vim /var/lib/pgsql/data/postgresql.conf
# 添加内容:listen_addresses='*'
-bash-4.2$ vim /var/lib/pgsql/data/pg_hba.conf
# 添加内容:host all postgres 192.168.100.1/32 trust
# 启动postgresql
bash-4.2$ /usr/pgsql-13/bin/pg_ctl -D /var/lib/pgsql/data -l logfile start
# 登录数据库
bash-4.2$ /usr/pgsql-13/bin/psql
psql (13.1)
Type "help" for help.
# 退出数据库登录
postgres=# \q
# 停止postgresql
bash-4.2$ /usr/pgsql-13/bin/pg_ctl stop -m fast -D /var/lib/pgsql/data

docker-compose部署安装:

version: “3.8”
services:
  postgresql:
    container_name: postgresql-125
    image: postgres:12.5
    environment:
      POSTGRES_DB: postgres
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: 1234
    volumes:
      - ./pgdata:/var/lib/postgresql/data
    ports:
      - "3432:5432"
    healthcheck:
      test: [ "CMD", "pg_isready", "-q", "-d", "postgres", "-U", "postgres" ]
      timeout: 45s
      interval: 10s
      retries: 10
2、项目配置使用

完整源代码:

        使用spring data r2dbc执行插入的时候,如果设置了主键,则认为该数据存在,需要执行的是更新。如果没有设置主键,则需要使用自增主键,此时插入数据的时候不设置主键值,系统认为是数据的插入。设置主键自增的方法:

-- 使用SERIAL
CREATE TABLE users
(
id SERIAL primary key ,
name character varying,
password character varying
)


-- 自动创建名为users_id_seq的序列,且MAXVALUE=9223372036854775807


-- 先创建序列,然后设置字段的自增
CREATE SEQUENCE users_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;


alter table users alter column id set default nextval('users_id_seq');

特别说明:PostgreSQL字段名称一定不要使用大写字母!

1.5、SpringData R2DBC 集成 MySQL

项目源代码示例:

1.6、SpringWebFlux 集成 MongoDB

完整源代码:

1.7、SpringWebFlux 集成 Redis

完整源代码示例:

package blnp.net.cn.handler;

import blnp.net.cn.entity.City;
import blnp.net.cn.repository.CityRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;

/**
 * @ClassName blnp.net.cn.handler.CityHandler
 * @Description <pre></pre>
 * @Author lyb 2045165565@qq.com
 * @CreateDate 2024/12/27 17:29
 * @Version v1.01
 * @ModifyRecord <pre>
 * 版本 修改人 修改时间 修改内容描述
 * ----------------------------------------------
 * 1.00 liaoyibin 2024/12/27 17:29 新建
 * ----------------------------------------------
 * </pre>
 */
@Component
public class CityHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(CityHandler.class);

    @Resource
    private RedisTemplate redisTemplate;

    private final CityRepository cityRepository;

    @Autowired
    public CityHandler(CityRepository cityRepository) {
        this.cityRepository = cityRepository;
    }

    public Mono<City> save(City city) {
        return cityRepository.save(city);
    }

    /**
     * 如果缓存存在,从缓存中获取城市信息;
     * 如果缓存不存在,从 DB 中获取城市信息,然后插入缓存
     *
     * @param id
     * @return
     */
    public Mono<City> findCityById(Long id) {
        // 从缓存中获取城市信息
        String key = "city_" + id;
        ValueOperations<String, City> operations = redisTemplate.opsForValue();
        // 缓存存在
        boolean hasKey = redisTemplate.hasKey(key);
        if (hasKey) {
            City city = operations.get(key);

            LOGGER.info("CityHandler.findCityById() : 从缓存中获取了城市 >> " + city.toString());
            return Mono.create(cityMonoSink -> cityMonoSink.success(city));
        }

        // 从 MongoDB 中获取城市信息
        Mono<City> cityMono = cityRepository.findById(id);
        if (cityMono == null) {
            return cityMono;
        }

        // 插入缓存
        cityMono.subscribe(cityObj -> {
            operations.set(key, cityObj);
            LOGGER.info("CityHandler.findCityById() : 城市插入缓存 >> " + cityObj.toString());
        });

        return cityMono;
    }

    public Flux<City> findAllCity() {
        return cityRepository.findAll().cache();
    }

    public Mono<City> modifyCity(City city) {
        Mono<City> cityMono = cityRepository.save(city);
        // 缓存存在,删除缓存
        String key = "city_" + city.getId();
        boolean hasKey = redisTemplate.hasKey(key);
        if (hasKey) {
            redisTemplate.delete(key);

            LOGGER.info("CityHandler.modifyCity() : 从缓存中删除城市 ID >> " + city.getId());
        }

        return cityMono;
    }

    /**
     * 如果缓存存在,删除;
     * 如果缓存不存在,不操作
     */
    public Mono<Long> deleteCity(Long id) {
        cityRepository.deleteById(id);
        String key = "city_" + id;
        boolean hasKey = redisTemplate.hasKey(key);
        if (hasKey) {
            redisTemplate.delete(key);
            LOGGER.info("CityHandler.deleteCity() : 从缓存中删除城市 ID >> " + id);
        }
        return Mono.create(cityMonoSink -> cityMonoSink.success(id));
    }

}

1.8、RxJava 持久库转响应库

示例项目完整代码:

        David Moten创建的rxjava2-jdbc库,可以非阻塞、响应式的方式封装 JDBC 驱动。该库基于 RxJava 2 构建,并使用专用线程池和非阻塞连接池,因此,请求不会在等待连接可用时阻塞线程。一旦连接可用,查询就开始在连接上执行并阻塞线程。该库具有流式 DSL,可以发送 SQL 语句并以响应式流的方式接收结果。

示例SQL:

-- 书本
CREATE TABLE `book` (
  `id` varchar(64) NOT NULL,
  `title` varchar(255) NOT NULL,
  `publishing_year` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

INSERT INTO `book` (`id`, `title`, `publishing_year`) VALUES ('11111111-1967-2343-7777-000000000000', 'The Case for Mars', 1996);
INSERT INTO `book` (`id`, `title`, `publishing_year`) VALUES ('98cfd43e-1967-47a1-ace7-8399a29866a0', 'The Martian', 2011);
INSERT INTO `book` (`id`, `title`, `publishing_year`) VALUES ('99999999-1967-47a1-aaaa-8399a29866a0', 'The War of Worlds', 1897);
INSERT INTO `book` (`id`, `title`, `publishing_year`) VALUES ('99cfd43e-1111-3333-ace7-8399a29866a0', 'Blue Mars', 1996);
INSERT INTO `book` (`id`, `title`, `publishing_year`) VALUES ('99cfd43e-1967-3344-e34a-222222233333', 'Edison\'sConquest of Mars', 1947);

Maven坐标:

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-webflux</artifactId>
	</dependency>
	<dependency>
		<groupId>io.reactivex.rxjava2</groupId>
		<artifactId>rxjava</artifactId>
		<version>2.2.20</version>
	</dependency>
	<dependency>
		<groupId>com.github.davidmoten</groupId>
		<artifactId>rxjava2-jdbc</artifactId>
		<version>0.2.7</version>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
	</dependency>
	<dependency>
		<groupId>com.google.guava</groupId>
		<artifactId>guava</artifactId>
		<version>30.1-jre</version>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.49</version>
	</dependency>
</dependencies>

转换核心代码:

 

package cn.blnp.net.studyboot.demo.repository;

import cn.blnp.net.studyboot.demo.service.IBookService;
import io.reactivex.Flowable;
import io.reactivex.Maybe;
import io.reactivex.Single;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.davidmoten.rx.jdbc.Database;
import org.davidmoten.rx.jdbc.tuple.Tuple2;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2025/1/15 16:24
 */
@Component
@RequiredArgsConstructor
public class RxBookRepository {

    /**
     *  新增SQL
     **/
    private static final String SAVE_QUERY =
            "insert into book (id, title, publishing_year) " +
                    "values(:id, :title, :publishing_year) " +
                    "on duplicate key " +
                    "update title=:title, publishing_year=:publishing_year";

    /**
     *  主键查下SQL
     **/
    private static final String SELECT_BY_ID =
            "select * from book where id=:id";

    /**
     *  标题查询SQL
     **/
    private static final String SELECT_BY_TITLE =
            "select * from book where title=:title";

    /**
     *  时间查询SQL
     **/
    private static final String SELECT_BY_YEAR_BETWEEN =
            "select * from book where " +
                    "publishing_year >= :from and publishing_year <= :to";

    private final Database database;

    /**
     * 用途:RxJava 流转为 React 规范的流
     * @author liaoyibin
     * @since 16:44 2025/1/15
     * @params [books]
     * @param books
     * @return io.reactivex.Flowable<cn.blnp.net.studyboot.demo.service.IBookService>
    **/
    public Flowable<IBookService> save(Flowable<IBookService> books) {
        return books
                .flatMap(book -> save(book).toFlowable());
    }

    /**
     * 用途:RxJava 数据新增接口
     * @author liaoyibin
     * @since 16:45 2025/1/15
     * @params [book]
     * @param book
     * @return io.reactivex.Single<cn.blnp.net.studyboot.demo.service.IBookService>
    **/
    public Single<IBookService> save(IBookService book) {
        return database
                .update(SAVE_QUERY)
                .parameter("id",book.id())
                .parameter("title",book.title())
                .parameter("publishing_year",book.publishing_year())
                .counts()
                .ignoreElements()
                .andThen(Single.just(book));
    }

    /**
     * 用途:查询所有
     * @author liaoyibin
     * @since 16:46 2025/1/15
     * @params []
     * @param
     * @return io.reactivex.Flowable<cn.blnp.net.studyboot.demo.service.IBookService>
    **/
    public Flowable<IBookService> findAll() {
        return database
                .select(IBookService.class)
                .get();
    }

    /**
     * 用途:根据ID查询
     * @author liaoyibin
     * @since 16:47 2025/1/15
     * @params [id]
     * @param id
     * @return io.reactivex.Maybe<cn.blnp.net.studyboot.demo.service.IBookService>
    **/
    public Maybe<IBookService> findById(String id) {
        return database
                .select(SELECT_BY_ID)
                .parameter("id", id)
                .autoMap(IBookService.class)
                .firstElement();
    }

    /**
     * 用途:标题查询
     * @author liaoyibin
     * @since 16:48 2025/1/15
     * @params [titlePublisher]
     * @param titlePublisher
     * @return io.reactivex.Maybe<cn.blnp.net.studyboot.demo.service.IBookService>
    **/
    public Maybe<IBookService> findByTitle(Publisher<String> titlePublisher) {
        return Flowable.fromPublisher(titlePublisher)
                .firstElement()
                .flatMap(title -> database
                        .select(SELECT_BY_TITLE)
                        .parameter("title", title)
                        .autoMap(IBookService.class)
                        .firstElement());
    }

    /**
     * 用途:时间查询并转换流
     * @author liaoyibin
     * @since 16:49 2025/1/15
     * @params [from, to]
     * @param from
     * @param to
     * @return io.reactivex.Flowable<cn.blnp.net.studyboot.demo.service.IBookService>
    **/
    public Flowable<IBookService> findByYearBetween(
            Single<Integer> from,
            Single<Integer> to
    ) {
        return Single
                .zip(from, to, Tuple2::new)
                .flatMapPublisher(tuple -> database
                        .select(SELECT_BY_YEAR_BETWEEN)
                        .parameter("from", tuple._1())
                        .parameter("to", tuple._2())
                        .autoMap(IBookService.class));
    }
}

单元测试验证:

package cn.blnp.net.studyboot;

import cn.blnp.net.studyboot.demo.repository.RxBookRepository;
import cn.blnp.net.studyboot.demo.service.IBookService;
import io.reactivex.Flowable;
import io.reactivex.Single;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.HashSet;
import java.util.Set;

@SpringBootTest
class StudyBootApplicationTests {

    @Resource
    RxBookRepository repository;

    @Test
    void contextLoads() {
    }

    @Test
    public void testFindAll() throws InterruptedException {
        final Flowable<IBookService> allBooks = repository.findAll();
        allBooks.subscribe(
                book -> System.out.println(book),
                ex -> {
                    System.err.println(ex);
                },
                () -> {
                    System.out.println("执行完成");
                }
        );
        Thread.sleep(5000);
    }

    @Test
    public void testFindById() throws InterruptedException {
        repository.findById("99cfd43e-1111-3333-ace7-8399a29866a0")
                .subscribe(
                        book -> System.out.println(book),
                        ex -> {
                            System.err.println(ex);
                        },
                        () -> {
                            System.out.println("执行完成");
                        }
                );
        Thread.sleep(5000);
    }

    @Test
    public void testFindByTitle() throws InterruptedException {
        Set<String> names = new HashSet<>();
        names.add("The Case for Mars");
        repository.findByTitle(Flowable.fromIterable(names))
                .subscribe(
                        book -> System.out.println(book),
                        ex -> {
                            System.err.println(ex);
                        },
                        () -> {
                            System.out.println("执行完成");
                        }
                );
        Thread.sleep(5000);
    }

    @Test
    public void testFindByYearBetween() throws InterruptedException {
        repository.findByYearBetween(Single.just(1990), Single.just(2020))
                .subscribe(
                        book -> System.out.println(book),
                        ex -> {
                            System.err.println(ex);
                        },
                        () -> {
                            System.out.println("执行完成");
                        }
                );
        Thread.sleep(5000);
    }

    @Test
    public void testSave1() throws InterruptedException {
        repository.save(new IBookService() {
            @Override
            public String id() {
                return "mybook-1";
            }
            @Override
            public String title() {
                return "mytitle-1";
            }
            @Override
            public Integer publishing_year() {
                return 2020;
            }
        }).subscribe();
        Thread.sleep(2000);
    }

    @Test
    public void testSave2() {
        repository.save(Flowable.just(new IBookService() {
            @Override
            public String id() {
                return "mybook-2";
            }
            @Override
            public String title() {
                return "mytitle-2";
            }
            @Override
            public Integer publishing_year() {
                return 2021;
            }
        })).blockingFirst();
    }
}

1.9、CrudRepository 包装

示例项目完整代码:

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-webflux</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-data-jpa</artifactId>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
	</dependency>
	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.49</version>
	</dependency>
</dependencies>

响应式接口适配器(核心):

package cn.blnp.net.reactboot.adapter;

import lombok.RequiredArgsConstructor;
import org.reactivestreams.Publisher;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/**
 * <p>响应式适配器</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2025/1/15 17:43
 */
@RequiredArgsConstructor
public abstract class ReactiveCrudRepositoryAdapter<T,ID,I extends CrudRepository<T,ID>> implements ReactiveCrudRepository<T,ID> {

    /**
     *  常规CRUD的接口实现
     **/
    protected final I delegate;
    protected final Scheduler scheduler;

    /**
     * 用途:将常规的crud保存方法转为响应式接口的实现
     * @author liaoyibin
     * @since 17:54 2025/1/15
     * @params [entity]
     * @param entity
     * @return reactor.core.publisher.Mono<S>
    **/
    @Override
    public <S extends T> Mono<S> save(S entity) {
        return Mono.fromCallable(() -> delegate.save(entity))
                .subscribeOn(scheduler);
    }

    /** 以下方法同理 **/

    @Override
    public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
        return Mono.fromCallable(() -> delegate.saveAll(entities))
                .flatMapMany(Flux::fromIterable)
                .subscribeOn(scheduler);
    }

    @Override
    public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
        return Flux.from(entityStream)
                .flatMap(entity -> Mono.fromCallable(() -> delegate.save(entity)))
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<T> findById(ID id) {
        return Mono.fromCallable(() -> delegate.findById(id))
                .flatMap(result -> result
                        .map(Mono::just)
                        .orElseGet(Mono::empty))
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<T> findById(Publisher<ID> id) {
        return Mono.from(id)
                .flatMap(actualId ->
                        delegate.findById(actualId)
                                .map(Mono::just)
                                .orElseGet(Mono::empty))
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<Boolean> existsById(ID id) {
        return Mono
                .fromCallable(() -> delegate.existsById(id))
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<Boolean> existsById(Publisher<ID> id) {
        return Mono.from(id)
                .flatMap(actualId ->
                        Mono.fromCallable(() -> delegate.existsById(actualId)))
                .subscribeOn(scheduler);
    }

    @Override
    public Flux<T> findAll() {
        return Mono
                .fromCallable(delegate::findAll)
                .flatMapMany(Flux::fromIterable)
                .subscribeOn(scheduler);
    }

    @Override
    public Flux<T> findAllById(Iterable<ID> ids) {
        return Mono
                .fromCallable(() -> delegate.findAllById(ids))
                .flatMapMany(Flux::fromIterable)
                .subscribeOn(scheduler);
    }

    @Override
    public Flux<T> findAllById(Publisher<ID> idStream) {
        return Flux
                .from(idStream)
                .buffer()
                .flatMap(ids -> Flux.fromIterable(delegate.findAllById(ids)))
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<Long> count() {
        return Mono
                .fromCallable(delegate::count)
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<Void> deleteById(ID id) {
        return Mono
                .<Void>fromRunnable(() -> delegate.deleteById(id))
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<Void> deleteById(Publisher<ID> id) {
        return Mono.from(id)
                .flatMap(actualId ->
                        Mono
                                .<Void>fromRunnable(() -> delegate.deleteById(actualId))
                                .subscribeOn(scheduler)
                );
    }

    @Override
    public Mono<Void> delete(T entity) {
        return Mono
                .<Void>fromRunnable(() -> delegate.delete(entity))
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<Void> deleteAll(Iterable<? extends T> entities) {
        return Mono
                .<Void>fromRunnable(() -> delegate.deleteAll(entities))
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
        return Flux.from(entityStream)
                .flatMap(entity -> Mono
                        .fromRunnable(() -> delegate.delete(entity))
                        .subscribeOn(scheduler))
                .then();
    }

    @Override
    public Mono<Void> deleteAll() {
        return Mono
                .<Void>fromRunnable(delegate::deleteAll)
                .subscribeOn(scheduler);
    }

    @Override
    public Mono<Void> deleteAllById(Iterable<? extends ID> ids) {
        return Mono
                .<Void>fromRunnable(() -> delegate.deleteAllById(ids))
                .subscribeOn(scheduler);
    }
}

常规接口定义:

package cn.blnp.net.reactboot.repository;

import cn.blnp.net.reactboot.entity.BookEntity;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Repository;

/**
 * <p>常规CRUD接口</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2025/1/15 17:41
 */
@Repository
public interface BookJpaRepository extends CrudRepository<BookEntity,Integer> {

    /**
     * 用途:根据ID查询
     * @author liaoyibin
     * @since 17:42 2025/1/15
     * @params [lower, upper]
     * @param lower
     * @param upper
     * @return java.lang.Iterable<cn.blnp.net.reactboot.entity.BookEntity>
    **/
    Iterable<BookEntity> findByIdBetween(int lower, int upper);

    /**
     * 用途:根据标题查询
     * @author liaoyibin
     * @since 17:43 2025/1/15
     * @params []
     * @param
     * @return java.lang.Iterable<cn.blnp.net.reactboot.entity.BookEntity>
    **/
    @Query("SELECT b FROM BookEntity b WHERE " +
            "LENGTH(b.title)=(SELECT MIN(LENGTH(b2.title)) FROM BookEntity b2)")
    Iterable<BookEntity> findShortestTitle();
}

响应式接口声明:

package cn.blnp.net.reactboot.repository;

import cn.blnp.net.reactboot.adapter.ReactiveCrudRepositoryAdapter;
import cn.blnp.net.reactboot.entity.BookEntity;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/**
 * <p>响应式的接口实现</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2025/1/15 17:56
 */
@Component
public class RxBookRepository extends
        ReactiveCrudRepositoryAdapter<BookEntity, Integer, BookJpaRepository> {

    public RxBookRepository(BookJpaRepository delegate, Scheduler scheduler) {
        super(delegate, scheduler);
    }

    /**
     * 用途:根据ID区间查询
     * @author liaoyibin
     * @since 18:06 2025/1/15
     * @params [lowerPublisher, upperPublisher]
     * @param lowerPublisher
     * @param upperPublisher
     * @return reactor.core.publisher.Flux<cn.blnp.net.reactboot.entity.BookEntity>
    **/
    public Flux<BookEntity> findByIdBetween(Publisher<Integer> lowerPublisher, Publisher<Integer> upperPublisher) {
        return Mono.zip(
                Mono.from(lowerPublisher),
                Mono.from(upperPublisher)
        ).flatMapMany(
                item ->
                        Flux.fromIterable(delegate.findByIdBetween(item.getT1(), item.getT2()))
                                .subscribeOn(scheduler)
        ).subscribeOn(scheduler);
    }

    /**
     * 用途:根据标题查询
     * @author liaoyibin
     * @since 18:06 2025/1/15
     * @params []
     * @param
     * @return reactor.core.publisher.Flux<cn.blnp.net.reactboot.entity.BookEntity>
    **/
    public Flux<BookEntity> findShortestTitle() {
        return Mono.fromCallable(delegate::findShortestTitle)
                .subscribeOn(scheduler)
                .flatMapMany(Flux::fromIterable);
    }
}

验证测试:

package cn.blnp.net.reactboot;

import cn.blnp.net.reactboot.config.RxPersistenceConfiguration;
import cn.blnp.net.reactboot.entity.BookEntity;
import cn.blnp.net.reactboot.repository.RxBookRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;

/**
 * @author user
 */
@Slf4j
@EnableJpaRepositories
@RequiredArgsConstructor
@SpringBootApplication
@Import({
        RxPersistenceConfiguration.class
})
public class ReactBootApplication implements CommandLineRunner {

    private final RxBookRepository bookRepository;

    public static void main(String[] args) {
        SpringApplication.run(ReactBootApplication.class, args);
    }

    @Override
    public void run(String... args) {
        Flux<BookEntity> books = Flux.just(
                new BookEntity("The Martian", 2011),
                new BookEntity("Blue Mars", 1996),
                new BookEntity("The War of the Worlds", 1898),
                new BookEntity("Artemis", 2016),
                new BookEntity("The Expanse: Leviathan Wakes", 2011),
                new BookEntity("The Expanse: Caliban's War", 2012)
        );

        bookRepository
                .saveAll(books)
                .count()
                .doOnNext(amount -> log.info("{} books saved in DB", amount))
                .block();

        Flux<BookEntity> allBooks = bookRepository.findAll();
        reportResults("All books in DB:", allBooks);

        Flux<BookEntity> andyWeirBooks = bookRepository
                .findByIdBetween(Mono.just(17), Mono.just(22));
        reportResults("Books with ids (17..22):", andyWeirBooks);

        Flux<BookEntity> booksWithFewAuthors = bookRepository.findShortestTitle();
        reportResults("Books with the shortest title:", booksWithFewAuthors);

        Mono.delay(Duration.ofSeconds(5))
                .subscribe(e -> log.info("Application finished successfully!"));
    }

    /**
     * 用途:输出执行结果
     * @author liaoyibin
     * @since 18:10 2025/1/15
     * @params [message, books]
     * @param message
     * @param books
     * @return void
    **/
    private void reportResults(String message, Flux<BookEntity> books) {
        books.map(BookEntity::toString)
                .reduce(
                        new StringBuffer(),
                        (sb, b) -> sb.append(" - ")
                                .append(b)
                                .append("\n"))
                .doOnNext(sb -> log.info(message + "\n{}", sb))
                .subscribe();
    }
}


http://www.kler.cn/a/505646.html

相关文章:

  • (即插即用模块-Attention部分) 四十四、(ICIP 2022) HWA 半小波注意力
  • Hive集群的安装准备
  • TCP 连接状态标识 | SYN, FIN, ACK, PSH, RST, URG
  • 代码随想录算法训练营第十二天|第18题. 四数之和
  • 关于linux的ld.so.conf.d
  • 信号与系统初识---信号的分类
  • android Recyclerview viewholder统一封装
  • Android Auto能够与Android设备整合的几项功能有哪些?
  • PostgreSQL-WAL日志介绍(二)
  • STM32-笔记43-低功耗
  • 机器学习(2):线性回归Python实现
  • npm更换淘宝镜像源
  • AI 编程工具—Cursor进阶使用 阅读开源项目
  • 2025网络架构
  • HTML5 Canvas实现的跨年烟花源代码
  • 【conda】迁移到其他ubuntu机器
  • OSPF - 特殊报文与ospf的机制
  • replaceState和vue的router.replace删除query参数的区别
  • 无人机航拍价格 航拍价格
  • 内存快照:宕机后Redis如何实现快速恢复?
  • 基于RFM聚类与随机森林算法的智能手机用户监测数据案例分析
  • Shell脚本一键推送到钉钉告警并@指定人
  • Nginx 如何设置 Upgrade-Insecure-Requests 报头 ?
  • python tkinter做界面 SDK打开海康工业相机,callback取图,halcon显示
  • 一文了解如何使用 DBeaver 管理 DolphinDB
  • 1_CSS3 边框 --[CSS3 进阶之路]