当前位置: 首页 > article >正文

响应式编程_03响应式编程在Netflix Hystrix 、Spring Cloud Gateway、Spring WebFlux中的应用

文章目录

  • 概述
  • 响应式编程在主流开源框架中的应用
    • Netflix Hystrix 中的滑动窗口 (基于RxJava框架)
      • `HystrixCircuitBreaker` 如何动态获取系统运行时的各项数据
      • 如何实现滑动窗口?
    • 小结
    • Spring Cloud Gateway 中的过滤器Filter (基于Project Reactor 框架)
      • 使用全局过滤器来对所有 HTTP 请求进行拦截
      • 自定义用于处理响应的Filter
    • Spring WebFlux 中的请求处理流程
  • 小结

在这里插入图片描述


概述

本质上,我们可以认为响应式编程并不仅仅是一种编程技术,而更是一种架构设计的系统方法,因此可以应用于任何地方。它既可以用于简单的 Web 应用系统,也可以用于大型企业解决方案。当然,对于响应式数据流,我们也完全可以基于它来构建流式系统或大数据系统。

数据流处理是响应式编程的一大应用场景。流式系统的主要特点是低延迟和高吞吐量。对于这类系统,大多数数据是从服务器端传出的,因此客户端扮演消费者的角色。这个时候,通过使用非阻塞式通信可以确保资源得到高效的利用,从而实现低延迟和高吞吐量。流式系统的表现形式也可以有很多,日常的日志埋点和分析、服务运行时的状态采集等都属于这种类型。

针对高并发流量,通常涉及大量的 I/O 操作。相较于传统的同步阻塞式 I/O 模型,响应式编程所具备的异步非阻塞式 I/O 模型非常适合应对处理高并发流量的业务场景。这类场景中比较典型的一种表现形式就是微服务架构中的 API 网关,因为网关的作用就是用来响应来自前端系统的流量并将其转发到后端服务。

讲到微服务架构,如何构建一个具有异步非阻塞式的请求处理流程的 Web 服务也是核心诉求,我们需要高效处理跨服务之间的网络请求。针对这种场景,响应式编程及其相关技术体系同样也是一种非常有效的解决方案 。


响应式编程在主流开源框架中的应用

响应式编程在日常开发过程中日益得到广泛的应用,结合上文所分析的三种典型应用场景,这里我们以对应的 Netflix Hystrix、Spring Cloud Gateway 以及 Spring WebFlux 这三款主流的开源框架为例,解析这些框架背后所应用的响应式编程技术。


Netflix Hystrix 中的滑动窗口 (基于RxJava框架)

在 Spring Cloud 微服务开发框架中,存在一个 Spring Cloud Netflix Hystrix 组件,该组件基于 Netflix Hystrix 实现了服务熔断功能。Netflix Hystrix 是 Netflix 开源的一款容错库,使用了 HystrixCircuitBreaker 类来实现熔断器。该类通过一个 circuitOpen 状态位控制着整个熔断判断流程,而这个状态位本身的状态值则取决于系统目前的执行数据和健康指标。

HystrixCircuitBreaker 如何动态获取系统运行时的各项数据

那么,HystrixCircuitBreaker 如何动态获取系统运行时的各项数据呢?

这里就使用到了一个 HealthCountsStream 类,从命名上不难看出,这就是一种数据流。HealthCountsStream 在设计上采用了一种特定的机制,即滑动窗口(Rolling Window)机制,而 Hystrix 在实现这一机制时大量采用了数据流处理方面的技术以及 RxJava 这个响应式编程框架。

Hystrix 以秒为单位来统计系统中所有请求的处理情况,然后每次取最近 10 秒的数据来进行计算。如果失败率超过一定阈值,就进行熔断。这里的 10 秒就是一个滑动窗口,参考其官网的一幅图,如下所示

在这里插入图片描述

上图演示了 Hystrix 滑动窗口策略,把 10 秒时间拆分成了 10 个格子,我们把这种格子称为桶 Bucket。每个桶中的数据就是这一秒中所处理的请求数量,并针对处理结果的状态做了分类。然后每当收集好一个新的桶后,就会丢弃掉最旧的一个桶,所以窗口是滑动的


如何实现滑动窗口?

我们转换一下思路,可以把系统运行时所产生的所有数据都视为一个个的事件,这样滑动窗口中每个桶的数据都来自源源不断的事件。同时,对于这些生成的事件,我们通常需要对其进行转换以便进行后续的操作。这两点构成了实现滑动窗口的设计目标和方法。

在技术实现的选型上,Hystrix 采用了基于响应式编程思想的 RxJava。与其他响应式编程框架一样,RxJava 同样实现了上一讲中介绍的响应式流规范。使用 RxJava 的一大好处是可以通过 RxJava 的一系列操作符来实现滑动窗口,包括 window、flatMap 和 reduce 等。其中

  • window 操作符是把当前流中的元素收集到另外的流序列;
  • flatMap 操作符把流中的每个元素转换成一个流,再把转换之后得到的所有流中的元素进行合并。
  • 而后 reduce 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的流。

在 Hystrix 中,HealthCountsStream 的子类 BucketedCounterStream 将基础数据事件流汇总成 Bucket,如下所示

this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
	// 使用window操作符收集一个Bucket时间内的数据
.window(bucketSizeInMs, TimeUnit.MILLISECONDS) 
// 将每个window内聚集起来的事件集合汇总成Bucket
.flatMap(reduceBucketToSummary).startWith(emptyEventCountsToStart);                       }
	        });

可以看到,这里分别使用了前面介绍的 window 和 flatMap 操作符来完成 Bucket 的构建。请注意,该方法返回的是一个 Observable<Bucket> 对象。在 RxJava 中,Observable 代表的就是一个无限流对象。

我们再来看 BucketedCounterStream 的子类 BucketedRollingCounterStream 类,该类的构造函数中同样存在一个类似的方法,如下所示(为了避免过于复杂,裁剪了部分代码)。

this.sourceStream = bucketedStream
	 //将N个Bucket进行汇总
	.window(numBuckets, 1)
	//汇总成一个窗口
	.flatMap(reduceWindowToSummary)//添加背压控制
	.onBackpressureDrop();

上述方法中基于父类 BucketedCounterStream 已经汇总的 bucketedStream 进行开窗处理,从而获取一个 sourceStream,这个 sourceStream 就是滑动窗口的最终形态。最后的 onBackpressureDrop() 语句是 RxJava 中提供的一种背压机制。我们在响应式编程_02基本概念:背压机制 Backpressure中讨论了背压机制的概念和作用,而在这里就看到了这一机制的具体应用。


小结

Hystrix 巧妙地运用了 RxJava 中的 window、flatMap 等操作符来将单位窗口时间内的事件,以及将一个窗口大小内的 Bucket 聚集到一起形成滑动窗口,并基于滑动窗口集成指标数据。这个设计思想非常巧妙,值得我们深入研究并对基于流的处理过程加以尝试和应用。


Spring Cloud Gateway 中的过滤器Filter (基于Project Reactor 框架)

Spring Cloud Gateway 是 Spring Cloud 微服务开发框架中的另一个核心组件,是 Spring 官方自己开发的一款 API 网关。

在技术体系上,Spring Cloud Gateway 基于最新的 Spring 5 和 Spring Boot 2,以及用于响应式编程的 Project Reactor 框架,提供的是响应式、非阻塞式 I/O 模型。所以性能上比 Netflix 中的 Zuul 网关要更胜一筹。

Spring Cloud Gateway 中的核心概念就是过滤器(Filter),围绕过滤器的请求处理流程如下图所示

在这里插入图片描述

过滤器用于在响应 HTTP 请求之前或之后修改请求本身及对应的响应结果。Spring Cloud Gateway 中提供了一个全局过滤器(GlobalFilter)的概念,对所有路由都生效。

使用全局过滤器来对所有 HTTP 请求进行拦截

我们来演示一下如何使用全局过滤器来对所有 HTTP 请求进行拦截,具体做法是实现 GlobalFilter 接口,示例代码如下所示。

@Configuration
public class JWTAuthFilter implements GlobalFilter {
 
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest.Builder builder = exchange.getRequest().mutate();
        builder.header("Authorization","Token");
        return chain.filter(exchange.mutate().request(builder.build()).build());
    }
}

以上代码展示了如何利用全局过滤器在所有的请求中添加 Header 的实现方法。在这个示例中,我们对所有经过 API 网关的 HTTP 请求添加了一个消息头,用来设置与访问 Token 相关的安全认证信息。

这里的 filter 方法返回了一个 Mono 对象,这是在响应式编程框架 Project Reactor 中代表单个返回值的流式对象


自定义用于处理响应的Filter

我们再来看一个使用过滤器的场景。以下代码展示了一个用于处理响应的 PostGatewayFilter 的实现方式,其中首先继承一个 AbstractGatewayFilterFactory 类,然后可以通过覆写 apply 方法来提供针对 ServerHttpResponse 对象的任何操作。

public class PostGatewayFilterFactory extends AbstractGatewayFilterFactory {
 
    public PostGatewayFilterFactory() {
        super(Config.class);
    }
 
    public GatewayFilter apply() {
        return apply(o -> {
        });
    }
 
    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
          return chain.filter(exchange).then(Mono.fromRunnable(() -> {
              ServerHttpResponse response = exchange.getResponse();

              //针对Response的各种处理
            }));
          };
    }
 
    public static class Config {
    }
}

同样,注意这里使用了 Reactor 框架中的 then 操作符,该操作符的含义是等到上一个操作完成再做下一个。所以,我们在过滤器链执行完对 exchange 对象的过滤之后,再通过 Mono.fromRunnable 方法创建一个新的线程,可以在这里添加各种针对 Response 对象的处理过程。


Spring WebFlux 中的请求处理流程

Spring WebFlux 是 Spring 5 中引入的全新的响应式 Web 服务开发框架。针对涉及大量 I/O 操作的服务化架构,WebFlux 也是一种非常有效的解决方案,能够在复杂的流程中集成非阻塞、异步通信机制,从而实现高效处理跨服务之间的网络请求。

在 WebFlux 中,对 HTTP 请求的处理过程涉及了 HandlerMapping、HandlerAdapter、HandlerResultHandler 类之间的交互,整个流程如下图所示。

在这里插入图片描述

我们直接来看用于完成上图流程的 Handle 方法定义,该方法实现了流式处理请求机制,如下所示。

public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
 
        return Flux.fromIterable(this.handlerMappings)
                //从handlerMapping这个map中获取HandlerMapping
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                //如果没有找到HandlerMapping,则抛出异常
                .switchIfEmpty(createNotFoundError())
                //触发HandlerAdapter的handle方法
                .flatMap(handler -> invokeHandler(exchange, handler))
                //触发HandlerResultHandler 的handleResult方法
                .flatMap(result -> handleResult(exchange, result));
}

在这个核心方法中,我们看到了 concatMap、switchIfEmpty 和 flatMap 等响应式操作符。


小结

这里通过理论联系实际,讨论了响应式编程的具体应用场景,发现响应式编程技术已经应用到了日常开发的很多开源框架中,包括 Netflix Hystrix、Spring Cloud Gateway 以及 Spring WebFlux 等。这些框架在分布式系统和微服务架构中得到了广泛的应用,而响应式编程在这些框架中发挥着重要作用。

在这里插入图片描述


http://www.kler.cn/a/533241.html

相关文章:

  • 深入解析:Python 爬虫高级技巧与实战应用
  • 哈夫曼树原理及其C语言实现
  • Spring @Lazy:延迟初始化,为应用减负
  • 计算机基础知识(第二篇)
  • 63.视频推荐的算法|Marscode AI刷题
  • 请解释 Java 中的 IO 和 NIO 的区别,以及 NIO 如何实现多路复用?
  • java进阶文章链接
  • Page Assist - 本地Deepseek模型 Web UI 的安装和使用
  • SpringBoot开发(四)SpringBoot配置文件
  • 【数据可视化-14】Boss招聘数据分析岗位的可视化分析
  • C++11详解(二) -- 引用折叠和完美转发
  • 使用C++构建一个优先级队列
  • 代理模式的作用
  • 从 DeepSeek R1 中提取数学推理数据,使用 CAMEL
  • 华为手机nova9,鸿蒙系统版本4.2.0.159,智慧助手.今天版本是14.x,如何卸载智慧助手.今天?
  • Python进行模型优化与调参
  • SGlang 专为大模型设计的高效服务框架
  • DRGDIP 2.0时代下基于PostgreSQL的成本管理实践与探索(上)
  • AI透明化与全球政治格局的发展:如何避免AI被人为操控
  • 电商用户画像数据可视化分析
  • 基于MODIS/Landsat/Sentinel/国产卫星遥感数据与DSSAT作物模型同化的作物产量估算
  • 使用 Redisson 实现分布式并发限流
  • Spring 面试题【每日20道】【其三】
  • 力扣73矩阵置零
  • 【Leetcode 每日一题】541. 反转字符串 II
  • Vue3 完整学习笔记 - 第二部分