当前位置: 首页 > article >正文

响应式编程库(三) -r2dbc

r2dbc整合

  • 什么是r2dbc
  • 版本选择
  • 简单试用
  • 整合springboot
    • DatabaseClient 进行查询
    • 使用Repository接口(对应mapper)
      • 实体类
      • 复杂查询(一对一)实体类转换器
      • 测试代码
      • 一对多关系

什么是r2dbc

反应式关系数据库连接(R2DBC)项目为关系数据库带来了反应式编程API。

基于Reactive Streams规范。R2DBC建立在Reactive Streams规范之上,它提供了一个完全反应式的非阻塞API

r2dbc 官网:https://r2dbc.io/
github: r2dbc-mysql 版本
spring-data r2dbc

版本选择

参考下表来确定适合你的项目的r2 dbc-mysql版本。

spring-boot-starter-data-r2dbcspring-data-r2dbcr2dbc-spir2dbc-mysql(recommended)
3.0.* and above3.0.* and above1.0.0.RELEASEio.asyncer:r2dbc-mysql:1.2.0
2.7.*1.5.*0.9.1.RELEASEio.asyncer:r2dbc-mysql:0.9.7
2.6.* and below1.4.* and below0.8.6.RELEASEdev.miku:r2dbc-mysql:0.8.2

简单试用

        <!-- https://mvnrepository.com/artifact/dev.miku/r2dbc-mysql -->
        <dependency>
            <groupId>dev.miku</groupId>
            <artifactId>r2dbc-mysql</artifactId>
            <version>0.8.2.RELEASE</version>
        </dependency>

 @Test
    void connection() throws IOException {

        // r2dbc基于全异步、响应式、消息驱动
        // jdbc:mysql://localhost:3306/test
        // r2dbc:mysql://localhost:3306/test

        //0、MySQL配置
        MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builder()
                .host("192.168.xx.xx")
                .port(3306)
                .username("root")
                .password("123456")
                .database("test")
                .connectTimeout(Duration.ofSeconds(3))
                .build();

        //1、获取连接工厂
        MySqlConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);


        //2、获取到连接,发送sql
        Mono<Connection> connectionMono = Mono.from(connectionFactory.create());

        // JDBC: Statement: 封装sql的
        //3、数据发布者
        connectionMono.flatMapMany(connection ->
                                    connection
//                                    .createStatement("INSERT INTO `t_book` (`publisher`, `author`) VALUES ('who', '1')")
                                    .createStatement("select * from t_book where id=?id and publisher=?")
                                    .bind("id", 1L) //具名参数
                                    .bind(1, "pub")
                                    .execute()
        ).flatMap(result -> {
			// 不同版本,api有所不一致
            return result.map((readable,book)->{
                System.out.println("readable:"+readable);
                System.out.println("book:"+book);
                        Long id = readable.get("id", Long.class);
                        String publisher = readable.get("publisher", String.class);
                        Long author = readable.get("author", Long.class);
                        return new TBook(author,publisher,id);
            });

        }).subscribe(tAuthor -> System.out.println("book = " + tAuthor));


        //背压; 不用返回所有东西,基于请求量返回;

        System.in.read();


    }

结果:

readable:dev.miku.r2dbc.mysql.MySqlRow@34579a88
book:MySqlRowMetadata{metadata=[MySqlColumnMetadata{index=0, type=8, name='id', definitions=4203, nullability=NON_NULL, size=20, decimals=0, collationId=63}, MySqlColumnMetadata{index=1, type=253, name='publisher', definitions=1001, nullability=NON_NULL, size=1020, decimals=0, collationId=45}, MySqlColumnMetadata{index=2, type=3, name='author_id', definitions=1001, nullability=NON_NULL, size=11, decimals=0, collationId=63}, MySqlColumnMetadata{index=3, type=12, name='create_time', definitions=81, nullability=NON_NULL, size=19, decimals=0, collationId=63}], sortedNames=[author_id, create_time, id, publisher]}
book = TBook(authorId=1, publisher=pub, id=1)

整合springboot

        <!-- https://mvnrepository.com/artifact/io.asyncer/r2dbc-mysql -->
        <dependency>
            <groupId>io.asyncer</groupId>
            <artifactId>r2dbc-mysql</artifactId>
            <version>1.0.5</version>
        </dependency>
        <!--        响应式 Spring Data R2dbc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>

DatabaseClient 进行查询

    @Autowired  //贴近底层,join操作好做; 复杂查询好用
    DatabaseClient databaseClient; //数据库客户端
 @Test
    void databaseClient() throws IOException {

        // 底层操作
        databaseClient
                .sql("select * from t_author")
//                .bind(0,2L)
                .fetch() //抓取数据
                .all()//返回所有
                .map(map -> {  //map == bean  属性=值
                    System.out.println("map = " + map);
                    String id = map.get("id").toString();
                    String name = map.get("name").toString();
                    return new TAuthor(Long.parseLong(id), name, null);
                })
                .subscribe(tAuthor -> System.out.println("tAuthor = " + tAuthor));
        System.in.read();


    }


spring:
  r2dbc:
    password: 123456
    username: root
    url: r2dbc:mysql://localhost:3306/test
    name: test

使用Repository接口(对应mapper)

/**
* TAuthor : 对应实体类;   Long  主键类型
*/
@Repository
public interface AuthorRepositories extends R2dbcRepository<TAuthor,Long> {

    //默认继承了一堆CRUD方法; 像mybatis-plus

    //QBC: Query By Criteria
    //QBE: Query By Example

    //成为一个起名工程师  where id In () and name like ?
    //仅限单表复杂条件查询。 不用编写sql!!!根据方法名自动生成sql
    Flux<TAuthor> findAllByIdInAndNameLike(Collection<Long> id, String name);

    //多表复杂查询

    @Query("select * from t_author") //自定义query注解,指定sql语句
    Flux<TAuthor> findHaha();


    // 1-1关联关系; 查出这本图书以及它的作者
    @Query("select b.*,t.name as name from t_book b" +
            " LEFT JOIN t_author t on b.author_id = t.id " +
            " WHERE b.id = :bookId")
    Mono<TBookAuthor> authorBook(@Param("bookId") Long bookId);


//    @Query("SELECT * FROM person WHERE lastname = :lastname")
//    Flux<Person> findByLastname(String lastname);
//
//    @Query("SELECT firstname, lastname FROM person WHERE lastname = $1")
//    Mono<Person> findFirstByLastname(String lastname);


}

实体类

@Data
@AllArgsConstructor
@NoArgsConstructor

@Table("t_book")
public class TBook {
    
    Long authorId;

    
    String publisher;

    
    @Id
    Long id;
 }
@Table("t_author")
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TAuthor {

    @Id
    private Long id;
    private String name;

    //1-N如何封装
    @Transient //临时字段,并不是数据库表中的一个字段
//    @Field(exist=false)
    private List<TBook> TBooks;
}

@Table("t_book")
@Data
public class TBookAuthor {
    
    @Id
    Long id;

    
    Long authorId;

    
    String publisher;

    /**
     * 响应式中日期的映射用 Instant 或者 LocalXxx
     */
    Instant createTime;
    
	/**
	* 一对一 关系 实体类
	*/
    TAuthor TAuthor;

}

复杂查询(一对一)实体类转换器

springdata r2dbc mapping 文档


/**
 * 告诉Spring Data 怎么封装 TBookAuthor 对象
 */
@ReadingConverter
public class BookAuthorConverter implements Converter<Row, TBookAuthor> {
    //1)、@Query 指定了 sql如何发送
    //2)、自定义 BookConverter 指定了 数据库返回的一 Row 数据,怎么封装成 TBook
    //3)、配置 R2dbcCustomConversions 组件,让 BookConverter 加入其中生效
    @Override
    public TBookAuthor convert(Row source) {

        if(source == null) return null;
        //自定义结果集的封装
        TBookAuthor tBook = new TBookAuthor();

        tBook.setId(source.get("id", Long.class));
        tBook.setPublisher(source.get("publisher", String.class));

        Long author_id = source.get("author_id", Long.class);
        tBook.setAuthorId(author_id);
        tBook.setCreateTime(source.get("create_time", Instant.class));


        //让 converter兼容更多的表结构处理
//        if (source.get("name",String.class)) {
            TAuthor tAuthor = new TAuthor();
            tAuthor.setId(author_id);
            tAuthor.setName(source.get("name", String.class));

            tBook.setTAuthor(tAuthor);
//        }
        return tBook;
    }
}

注册转换器


@EnableR2dbcRepositories //开启 R2dbc 仓库功能;jpa
@Configuration
public class R2DbcConfiguration {

    @Bean //替换容器中原来的
    @ConditionalOnMissingBean
    public R2dbcCustomConversions conversions(){

        //把我们的转换器加入进去; 效果新增了我们的 Converter
        return R2dbcCustomConversions.of(MySqlDialect.INSTANCE,new BookAuthorConverter());
    }
}

测试代码

@SpringBootTest
public class AppTest {

    @Autowired
    TBookMapper bookMapper;

    @Test
    void testCRUD() throws IOException {
//         bookMapper.findAll().subscribe(System.out::println);
//        bookMapper.findById(1L).subscribe(System.out::println);
        TBookAuthor block = bookMapper.authorBook(1L).block();
        System.out.println(block);
        //查询是全异步的, 需要阻塞一下
        System.in.read();


    }
 }

一对多关系

在这里插入图片描述

    @Test
    void oneToN() throws IOException {

//        databaseClient.sql("select a.id aid,a.name,b.* from t_author a  " +
//                "left join t_book b on a.id = b.author_id " +
//                "order by a.id")
//                .fetch()
//                .all(row -> {
//
//                })


        // 1~6
        // 1:false 2:false 3:false 4: true 8:true 5:false 6:false 7:false 8:true 9:false 10:false
        // [1,2,3]
        // [4,8]
        // [5,6,7]
        // [8]
        // [9,10]
        // bufferUntilChanged:
        // 如果下一个判定值比起上一个发生了变化就开一个新buffer保存,如果没有变化就保存到原buffer中

//        Flux.just(1,2,3,4,8,5,6,7,8,9,10)
//                .bufferUntilChanged(integer -> integer%4==0 )
//                .subscribe(list-> System.out.println("list = " + list));
        ; //自带分组


        Flux<TAuthor> flux = databaseClient.sql("select a.id aid,a.name,b.* from t_author a  " +
                        "left join t_book b on a.id = b.author_id " +
                        "order by a.id")
                .fetch()
                .all()
                .bufferUntilChanged(rowMap -> Long.parseLong(rowMap.get("aid").toString()))
                .map(list -> {
                    TAuthor tAuthor = new TAuthor();
                    Map<String, Object> map = list.get(0);
                    tAuthor.setId(Long.parseLong(map.get("aid").toString()));
                    tAuthor.setName(map.get("name").toString());


                    //查到的所有图书
                    List<TBook> tBooks = list.stream()
                            .map(ele -> {
                                TBook tBook = new TBook();

                                tBook.setId(Long.parseLong(ele.get("id").toString()));
                                tBook.setAuthorId(Long.parseLong(ele.get("author_id").toString()));
                                tBook.setTitle(ele.get("title").toString());
                                return tBook;
                            })
                            .collect(Collectors.toList());

                    tAuthor.setBooks(tBooks);
                    return tAuthor;
                });//Long 数字缓存 -127 - 127;// 对象比较需要自己写好equals方法



        flux.subscribe(tAuthor -> System.out.println("tAuthor = " + tAuthor));

        System.in.read();


    }

http://www.kler.cn/a/538306.html

相关文章:

  • Centos Ollama + Deepseek-r1+Chatbox运行环境搭建
  • AF3 drmsd函数解读
  • Jenkins数据备份到windows FTP服务器
  • 链表(LinkedList) 1
  • postgreSQL16.6源码安装
  • idea 如何使用deepseek 保姆级教程
  • 差分算法解析
  • w196Spring Boot高校教师科研管理系统设计与实现
  • 高速网络的未来:零拷贝Zero-Copy架构
  • 国产AI大模型DeepSeek的本地化部署
  • 数据留痕的方法
  • WordPress wp-recall插件存在SQL注入漏洞(CVE-2024-32709)
  • 响应式编程_04Spring 5 中的响应式编程技术栈_WebFlux 和 Spring Data Reactive
  • 【异常解决】在idea中提示 hutool 提示 HttpResponse used withoud try-with-resources statement
  • 线程安全面试题
  • 【C语言标准库函数】指数与对数函数:exp(), log(), log10()
  • google 多模态aistudio Stream Realtime体验
  • cursor指令工具
  • flask开发的网站,后端服务关闭后,可以找回之前的数据的吗
  • Vue全流程--Vue2路由
  • 【Leetcode 每日一题】63. 不同路径 II
  • 计算机组成原理(3)
  • Tengine配置负载均衡加健康检查
  • 【AI】人工智能与搜索引擎知识了解
  • 高效 MyBatis SQL 写法一
  • 【Vue】在Vue3中使用Echarts的示例 两种方法