当前位置: 首页 > article >正文

Gateway与WebFlux的整合

 WebFlux:HandlerMapping

HandlerMapping下主要分为2个分支:AbstractUrlHandlerMapping和AbstractHandlerMethodMapping,其中AbstractUrlHandlerMapping用于url与handler的匹配,AbstractHandlerMethodMapping用于HandlerMethod与handler的匹配,2者均继承于AbstractHandlerMapping,更详细的后续会单独开一篇文章。

AbstractHandlerMapping

public abstract class AbstractHandlerMapping extends ApplicationObjectSupport
		implements HandlerMapping, Ordered, BeanNameAware {
   
    	@Override
	public Mono<Object> getHandler(ServerWebExchange exchange) {
		return getHandlerInternal(exchange).map(handler -> {
			if (logger.isDebugEnabled()) {
				logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
			}
			ServerHttpRequest request = exchange.getRequest();
			if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
				CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
				CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
				config = (config != null ? config.combine(handlerConfig) : handlerConfig);
				if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
					return REQUEST_HANDLED_HANDLER;
				}
			}
			return handler;
		});
	}
    
    protected abstract Mono<?> getHandlerInternal(ServerWebExchange exchange);
}

可以看出这里的核心方法是getHandler,而内部的真实实现位于getHandlerInternal,这是个抽象方法并未实现。

WebFlux:HandlerAdapter

这里简单介绍下:

  • HandlerFunctionAdapter:支持HandlerFunction的实例,业务实现方法为HandlerFunction#handle
  • RequestMappingHandlerAdapter:这是日常业务开发用到最多的,对应的支持类型为HandlerMethod,业务方法为各自定义的方法,无具体的限制方法
  • SimpleHandlerAdapter:要求handler的类型为WebHandler或者其子类的实现类,业务实现方法为WebHandler#handle
  • WebSocketHandlerAdapter:要求handler的类型为WebSocketHandler或者其子类的实现类,业务实现方法为WebSocketHandler#handle,这里涉及到WebSocketService的处理,支持webSocket协议的处理

WebFlux:HandlerResultHandler

简单介绍下:

  • ResponseBodyResultHandler:支持类或者方法上有@ResponseBody注解的数据处理
  • ResponseEntityResultHandler:支持返回值是HttpEntity或其子类实例(不能是RequestEntity及其子类的实例)、HttpHeaders及其子类实例的数据处理
  • ServerResponseResultHandler:支持返回值是ServerResponse实例的数据处理
  • ViewResolutionResultHandler:支持CharSequence、Rendering、Model、Map、View、非简单属性及其子类实例的数据处理,这里的简单类型如下
    public static boolean isSimpleValueType(Class<?> type) {
    		return (Void.class != type && void.class != type &&
    				(ClassUtils.isPrimitiveOrWrapper(type) ||
    				Enum.class.isAssignableFrom(type) ||
    				CharSequence.class.isAssignableFrom(type) ||
    				Number.class.isAssignableFrom(type) ||
    				Date.class.isAssignableFrom(type) ||
    				Temporal.class.isAssignableFrom(type) ||
    				URI.class == type ||
    				URL.class == type ||
    				Locale.class == type ||
    				Class.class == type));
    	}
    

Gateway:GatewayAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore({ HttpHandlerAutoConfiguration.class, WebFluxAutoConfiguration.class })
@AutoConfigureAfter({ GatewayLoadBalancerClientAutoConfiguration.class, GatewayClassPathWarningAutoConfiguration.class })
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {

    @Bean
	public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
		return new FilteringWebHandler(globalFilters);
	}
    
    @Bean
	public RoutePredicateHandlerMapping routePredicateHandlerMapping(
			FilteringWebHandler webHandler, RouteLocator routeLocator,
			GlobalCorsProperties globalCorsProperties, Environment environment) {
		return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);
	}

}

这里主要标出主流程涉及到的类:

  • RoutePredicateHandlerMapping:负责路由匹配
  • FilteringWebHandler:负责过滤器的执行

Gateway:RoutePredicateHandlerMapping

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {

    private final FilteringWebHandler webHandler;

    
    @Override
	protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
		// don't handle requests on management port if set and different than server port
		if (this.managementPortType == DIFFERENT && this.managementPort != null
				&& exchange.getRequest().getURI().getPort() == this.managementPort) {
			return Mono.empty();
		}
		exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

		return lookupRoute(exchange)
				// .log("route-predicate-handler-mapping", Level.FINER) //name this
				.flatMap((Function<Route, Mono<?>>) r -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isDebugEnabled()) {
						logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
					}
					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
					return Mono.just(webHandler);
				})
				.switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isTraceEnabled()) {
						logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
					}
				})));
	}

    
    protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
		return this.routeLocator.getRoutes()
				// individually filter routes so that filterWhen error delaying is not a
				// problem
				.concatMap(route -> Mono.just(route).filterWhen(r -> {
					// add the current route we are testing
					exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
					return r.getPredicate().apply(exchange);
				})
				// instead of immediately stopping main flux due to error, log and swallow it
				.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
				.onErrorResume(e -> Mono.empty()))
				// .defaultIfEmpty() put a static Route not found
				// or .switchIfEmpty()
				// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
				.next()
				// TODO: error handling
				.map(route -> {
					if (logger.isDebugEnabled()) {
						logger.debug("Route matched: " + route.getId());
					}
					validateRoute(route, exchange);
					return route;
				});

		/*
		 * TODO: trace logging if (logger.isTraceEnabled()) {
		 * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
		 */
	}

}
lookupRoute方法通过AsyncPredicate选择出对应的路由,并将路由id记录到ServerWebExchange的属性gatewayPredicateRouteAttr中,getHandlerInternal中则移除gatewayPredicateRouteAttr属性,将整个路由放入ServerWebExchange的gatewayRoute属性中,最后返回FilteringWebHandler

Gateway:FilteringWebHandler

public class FilteringWebHandler implements WebHandler {

	protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class);

	private final List<GatewayFilter> globalFilters;

	public FilteringWebHandler(List<GlobalFilter> globalFilters) {
		this.globalFilters = loadFilters(globalFilters);
	}

	private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
		return filters.stream().map(filter -> {
			GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
			if (filter instanceof Ordered) {
				int order = ((Ordered) filter).getOrder();
				return new OrderedGatewayFilter(gatewayFilter, order);
			}
			return gatewayFilter;
		}).collect(Collectors.toList());
    
    public Mono<Void> handle(ServerWebExchange exchange) {
		Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
		List<GatewayFilter> gatewayFilters = route.getFilters();

		List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
		combined.addAll(gatewayFilters);
		// TODO: needed or cached?
		AnnotationAwareOrderComparator.sort(combined);

		if (logger.isDebugEnabled()) {
			logger.debug("Sorted gatewayFilterFactories: " + combined);
		}

		return new DefaultGatewayFilterChain(combined).filter(exchange);
	}
}
FilteringWebHandler实现了WebHandler接口作为当前路由的handler返回,而在WebFlux中,对应的handlerAdapter则为SimpleHandlerAdapter,这里会调用handle方法,即FilteringWebHandler#handle,该方法中将路由中定义的filter即GatewayFilter和全局filter即GlobalFilter进行拼装并排序,构建了DefaultGatewayFilterChain过滤器链条:
private static class DefaultGatewayFilterChain implements GatewayFilterChain {

    @Override
		public Mono<Void> filter(ServerWebExchange exchange) {
			return Mono.defer(() -> {
				if (this.index < filters.size()) {
					GatewayFilter filter = filters.get(this.index);
					DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
					return filter.filter(exchange, chain);
				}
				else {
					return Mono.empty(); // complete
				}
			});
		}

}

可以看出这里会逐个执行所有的过滤器逻辑

Gateway:默认全局过滤器

public class GatewayAutoConfiguration {
    
    @Bean
	@ConditionalOnEnabledGlobalFilter
	public AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter() {
		return new AdaptCachedBodyGlobalFilter();
	}

	@Bean
	@ConditionalOnEnabledGlobalFilter
	public RemoveCachedBodyFilter removeCachedBodyFilter() {
		return new RemoveCachedBodyFilter();
	}

	@Bean
	@ConditionalOnEnabledGlobalFilter
	public RouteToRequestUrlFilter routeToRequestUrlFilter() {
		return new RouteToRequestUrlFilter();
	}

	@Bean
	@ConditionalOnEnabledGlobalFilter
	public ForwardRoutingFilter forwardRoutingFilter(ObjectProvider<DispatcherHandler> dispatcherHandler) {
		return new ForwardRoutingFilter(dispatcherHandler);
	}

	@Bean
	@ConditionalOnEnabledGlobalFilter
	public ForwardPathFilter forwardPathFilter() {
		return new ForwardPathFilter();
	}


	@Bean
	@ConditionalOnEnabledGlobalFilter
	public WebsocketRoutingFilter websocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService,
			ObjectProvider<List<HttpHeadersFilter>> headersFilters) {
		return new WebsocketRoutingFilter(webSocketClient, webSocketService, headersFilters);
	}

	@Bean
	public WeightCalculatorWebFilter weightCalculatorWebFilter(ConfigurationService configurationService,
			ObjectProvider<RouteLocator> routeLocator) {
		return new WeightCalculatorWebFilter(routeLocator, configurationService);
	}

}

可以看出默认的全局过滤器均需手动去开启,WeightCalculatorWebFilter为GateFilter的实现,用于权重的计算,默认会配置


http://www.kler.cn/a/507920.html

相关文章:

  • RV1126+FFMPEG推流项目(6)视频码率及其码率控制方式
  • 02JavaWeb——JavaScript-Vue(项目实战)
  • Windows图形界面(GUI)-QT-C/C++ - QT 窗口属性
  • SpringBoot集成Mqtt服务实现消费发布和接收消费
  • 【机器学习:二十二、机器学习项目开发的技巧】
  • SpringMVC——原理简介
  • 1.3变革之力:Transformer 如何重塑深度学习的未来
  • 精选算法合集
  • 快慢指针问题
  • 【2024年华为OD机试】(B卷,100分)- 比赛 (Java JS PythonC/C++)
  • 隧道IP广播与紧急电话系统:提升隧道安全的关键技术
  • CanTp 笔记
  • 【微信小程序】5|我的页面 | 我的咖啡店-综合实训
  • 【PowerQuery专栏】PowerQuery 函数之CSV文件处理函数
  • 手机上做笔记的APP工具?有哪些好用的记笔记APP
  • 警惕IDEA 2024版重大Bug问题:LomBok失效、Gradle冲突、Spring Boot启动错误
  • 【Azure 架构师学习笔记】- Azure Function (2) --实操1
  • JVM直击重点
  • 在 Azure 100 学生订阅中新建 Ubuntu VPS 并通过 Docker 部署 pSQL 服务器
  • 加菲工具格式化XML:让数据呈现更清晰
  • Python 文字生成语言,保存为wav格式
  • SQL2000在win10上安装的方法
  • go语言zero框架中在线截图chromedp 设置超限的网页长度
  • 基于matlab的火焰高度求解
  • docker与部署微服务实战
  • Elasticsearch单机安装