Gateway与WebFlux的整合
WebFlux:HandlerMapping
HandlerMapping下主要分为2个分支:AbstractUrlHandlerMapping和AbstractHandlerMethodMapping,其中AbstractUrlHandlerMapping用于url与handler的匹配,AbstractHandlerMethodMapping用于HandlerMethod与handler的匹配,2者均继承于AbstractHandlerMapping,更详细的后续会单独开一篇文章。
AbstractHandlerMapping
public abstract class AbstractHandlerMapping extends ApplicationObjectSupport
implements HandlerMapping, Ordered, BeanNameAware {
@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
return getHandlerInternal(exchange).map(handler -> {
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
}
ServerHttpRequest request = exchange.getRequest();
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return REQUEST_HANDLED_HANDLER;
}
}
return handler;
});
}
protected abstract Mono<?> getHandlerInternal(ServerWebExchange exchange);
}
可以看出这里的核心方法是getHandler,而内部的真实实现位于getHandlerInternal,这是个抽象方法并未实现。
WebFlux:HandlerAdapter
这里简单介绍下:
- HandlerFunctionAdapter:支持HandlerFunction的实例,业务实现方法为HandlerFunction#handle
- RequestMappingHandlerAdapter:这是日常业务开发用到最多的,对应的支持类型为HandlerMethod,业务方法为各自定义的方法,无具体的限制方法
- SimpleHandlerAdapter:要求handler的类型为WebHandler或者其子类的实现类,业务实现方法为WebHandler#handle
- WebSocketHandlerAdapter:要求handler的类型为WebSocketHandler或者其子类的实现类,业务实现方法为WebSocketHandler#handle,这里涉及到WebSocketService的处理,支持webSocket协议的处理
WebFlux:HandlerResultHandler
简单介绍下:
- ResponseBodyResultHandler:支持类或者方法上有@ResponseBody注解的数据处理
- ResponseEntityResultHandler:支持返回值是HttpEntity或其子类实例(不能是RequestEntity及其子类的实例)、HttpHeaders及其子类实例的数据处理
- ServerResponseResultHandler:支持返回值是ServerResponse实例的数据处理
- ViewResolutionResultHandler:支持CharSequence、Rendering、Model、Map、View、非简单属性及其子类实例的数据处理,这里的简单类型如下
public static boolean isSimpleValueType(Class<?> type) { return (Void.class != type && void.class != type && (ClassUtils.isPrimitiveOrWrapper(type) || Enum.class.isAssignableFrom(type) || CharSequence.class.isAssignableFrom(type) || Number.class.isAssignableFrom(type) || Date.class.isAssignableFrom(type) || Temporal.class.isAssignableFrom(type) || URI.class == type || URL.class == type || Locale.class == type || Class.class == type)); }
Gateway:GatewayAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore({ HttpHandlerAutoConfiguration.class, WebFluxAutoConfiguration.class })
@AutoConfigureAfter({ GatewayLoadBalancerClientAutoConfiguration.class, GatewayClassPathWarningAutoConfiguration.class })
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {
@Bean
public FilteringWebHandler filteringWebHandler(List<GlobalFilter> globalFilters) {
return new FilteringWebHandler(globalFilters);
}
@Bean
public RoutePredicateHandlerMapping routePredicateHandlerMapping(
FilteringWebHandler webHandler, RouteLocator routeLocator,
GlobalCorsProperties globalCorsProperties, Environment environment) {
return new RoutePredicateHandlerMapping(webHandler, routeLocator, globalCorsProperties, environment);
}
}
这里主要标出主流程涉及到的类:
- RoutePredicateHandlerMapping:负责路由匹配
- FilteringWebHandler:负责过滤器的执行
Gateway:RoutePredicateHandlerMapping
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
private final FilteringWebHandler webHandler;
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
})
.switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and swallow it
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});
/*
* TODO: trace logging if (logger.isTraceEnabled()) {
* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
*/
}
}
lookupRoute方法通过AsyncPredicate选择出对应的路由,并将路由id记录到ServerWebExchange的属性gatewayPredicateRouteAttr中,getHandlerInternal中则移除gatewayPredicateRouteAttr属性,将整个路由放入ServerWebExchange的gatewayRoute属性中,最后返回FilteringWebHandler
Gateway:FilteringWebHandler
public class FilteringWebHandler implements WebHandler {
protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class);
private final List<GatewayFilter> globalFilters;
public FilteringWebHandler(List<GlobalFilter> globalFilters) {
this.globalFilters = loadFilters(globalFilters);
}
private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
return filters.stream().map(filter -> {
GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
if (filter instanceof Ordered) {
int order = ((Ordered) filter).getOrder();
return new OrderedGatewayFilter(gatewayFilter, order);
}
return gatewayFilter;
}).collect(Collectors.toList());
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
List<GatewayFilter> gatewayFilters = route.getFilters();
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// TODO: needed or cached?
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
}
FilteringWebHandler实现了WebHandler接口作为当前路由的handler返回,而在WebFlux中,对应的handlerAdapter则为SimpleHandlerAdapter,这里会调用handle方法,即FilteringWebHandler#handle,该方法中将路由中定义的filter即GatewayFilter和全局filter即GlobalFilter进行拼装并排序,构建了DefaultGatewayFilterChain过滤器链条:
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
可以看出这里会逐个执行所有的过滤器逻辑
Gateway:默认全局过滤器
public class GatewayAutoConfiguration {
@Bean
@ConditionalOnEnabledGlobalFilter
public AdaptCachedBodyGlobalFilter adaptCachedBodyGlobalFilter() {
return new AdaptCachedBodyGlobalFilter();
}
@Bean
@ConditionalOnEnabledGlobalFilter
public RemoveCachedBodyFilter removeCachedBodyFilter() {
return new RemoveCachedBodyFilter();
}
@Bean
@ConditionalOnEnabledGlobalFilter
public RouteToRequestUrlFilter routeToRequestUrlFilter() {
return new RouteToRequestUrlFilter();
}
@Bean
@ConditionalOnEnabledGlobalFilter
public ForwardRoutingFilter forwardRoutingFilter(ObjectProvider<DispatcherHandler> dispatcherHandler) {
return new ForwardRoutingFilter(dispatcherHandler);
}
@Bean
@ConditionalOnEnabledGlobalFilter
public ForwardPathFilter forwardPathFilter() {
return new ForwardPathFilter();
}
@Bean
@ConditionalOnEnabledGlobalFilter
public WebsocketRoutingFilter websocketRoutingFilter(WebSocketClient webSocketClient, WebSocketService webSocketService,
ObjectProvider<List<HttpHeadersFilter>> headersFilters) {
return new WebsocketRoutingFilter(webSocketClient, webSocketService, headersFilters);
}
@Bean
public WeightCalculatorWebFilter weightCalculatorWebFilter(ConfigurationService configurationService,
ObjectProvider<RouteLocator> routeLocator) {
return new WeightCalculatorWebFilter(routeLocator, configurationService);
}
}
可以看出默认的全局过滤器均需手动去开启,WeightCalculatorWebFilter为GateFilter的实现,用于权重的计算,默认会配置