Spring Cloud Gateway 源码
Spring Cloud Gateway 架构图
按照以上架构图,请求的处理流程:
1.客户端请求发送到网关 DispatcherHandler
2.网关通过 HandlerMapping 找到相应的 WebHandler
3.WebHandler生成FilterChain过滤器链执行所有的过滤器
4.返回Response结果
自动装配类GatewayAutoConfiguration
看几个关键的Bean
路由定义加载
PropertiesRouteDefinitionLocator
CompositeRouteDefinitionLocator
RouteDefinitionRouteLocator
HandlerMapping类
RoutePredicateHandlerMapping
WebHandler类 核心逻辑
FilteringWebHandler
RoutePredicateFactory 路由断言工厂,很多只列举部分
WeightCalculatorWebFilter // 权重
BeforeRoutePredicateFactory // 时间
HeaderRoutePredicateFactory // 请求头匹配
HostRoutePredicateFactory // 地址匹配
MethodRoutePredicateFactory // 请求方法
CookieRoutePredicateFactory // cookie匹配
......
GatewayFilterFactory 网关过滤器工厂,很多只列举部分
AddRequestHeaderGatewayFilterFactory // 添加请求头过滤器
MapRequestHeaderGatewayFilterFactory // 请求头值替换过滤器
AddRequestParameterGatewayFilterFactory // 添加请求参数过滤器
AddResponseHeaderGatewayFilterFactory // 添加响应头过滤器
ModifyRequestBodyGatewayFilterFactory // 修改请求实体过滤器
......
核心入口
DispatcherHandler.handle(ServerWebExchange exchange) 方法
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
return Flux.fromIterable(this.handlerMappings)
// 获取handler
.concatMap(mapping -> mapping.getHandler(exchange))
.next()
.switchIfEmpty(createNotFoundError())
// 通过handlerAdapter执行handler
.flatMap(handler -> invokeHandler(exchange, handler))
// 处理结果
.flatMap(result -> handleResult(exchange, result));
}
获取处理器 mapping.getHandler(exchange)
通过RoutePredicateHandlerMapping 找到 FilteringWebHandler
最终获取到的handler是 private final FilteringWebHandler webHandler;
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}
执行处理器 invokeHandler(exchange, handler)
private Mono<HandlerResult> invokeHandler(ServerWebExchange exchange, Object handler) {
if (ObjectUtils.nullSafeEquals(exchange.getResponse().getStatusCode(), HttpStatus.FORBIDDEN)) {
return Mono.empty(); // CORS rejection
}
if (this.handlerAdapters != null) {
for (HandlerAdapter handlerAdapter : this.handlerAdapters) {
if (handlerAdapter.supports(handler)) {
return handlerAdapter.handle(exchange, handler);
}
}
}
return Mono.error(new IllegalStateException("No HandlerAdapter: " + handler));
}
最终是由SimpleHandlerAdapter执行handler
具体的处理逻辑 webHandler.handle(exchange)
public class SimpleHandlerAdapter implements HandlerAdapter {
@Override
public boolean supports(Object handler) {
return WebHandler.class.isAssignableFrom(handler.getClass());
}
@Override
public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
WebHandler webHandler = (WebHandler) handler;
Mono<Void> mono = webHandler.handle(exchange);
return mono.then(Mono.empty());
}
}
最终进入FilteringWebHandler.handle(ServerWebExchange exchange)方法
public Mono<Void> handle(ServerWebExchange exchange) {
Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
// 获取当前路配置的GatewayFilter
List<GatewayFilter> gatewayFilters = route.getFilters();
// 加上所有全局过滤器
List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
combined.addAll(gatewayFilters);
// 按Ordered排序
AnnotationAwareOrderComparator.sort(combined);
if (logger.isDebugEnabled()) {
logger.debug("Sorted gatewayFilterFactories: " + combined);
}
// 过滤器链执行
return new DefaultGatewayFilterChain(combined).filter(exchange);
}
每个过滤器执行完,重新封装新的过滤器链,过滤器链索引+1继续执行
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
过滤器链执行
网关会在各种过滤器处理后(根据配置的过滤器修改请求信息,重写路径等),最后转发到对应服务,核心就是通过NettyRoutingFilter来实现的
NettyRoutingFilter会代理发送http请求到对应的服务器,最终返回结果
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
// 判断是否已路由,是否http请求
if (isAlreadyRouted(exchange) || (!"http".equalsIgnoreCase(scheme) && !"https".equalsIgnoreCase(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
ServerHttpRequest request = exchange.getRequest();
final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
final String url = requestUrl.toASCIIString();
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
// 构建请求并发送,最终返回结果
Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {
headers.add(httpHeaders);
// Will either be set below, or later by Netty
headers.remove(HttpHeaders.HOST);
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
}).request(method).uri(url).send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound.withConnection(connection -> log.trace("outbound route: "
+ connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix()));
}
return nettyOutbound.send(request.getBody().map(this::getByteBuf));
}).responseConnection((res, connection) -> {
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
}
setResponseStatus(res, response);
// make sure headers filters run after setting status so it is
// available in response
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
Type.RESPONSE);
if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// It is not valid to have both the transfer-encoding header and
// the content-length header.
// Remove the transfer-encoding header in the response if the
// content-length header is present.
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
response.getHeaders().addAll(filteredResponseHeaders);
return Mono.just(res);
});
Duration responseTimeout = getResponseTimeout(route);
if (responseTimeout != null) {
responseFlux = responseFlux
.timeout(responseTimeout,
Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))
.onErrorMap(TimeoutException.class,
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
}
return responseFlux.then(chain.filter(exchange));
}
最后一个ForwardRoutingFilter判断是否需要转发到对应地址的
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
// TODO: translate url?
if (log.isTraceEnabled()) {
log.trace("Forwarding to URI: " + requestUrl);
}
return this.getDispatcherHandler().handle(exchange);
}
到此Spring Cloud Gateway的核心处理逻辑就分析完了,主要是针对核心逻辑链路的处理,很多细节都没深入,有兴趣可以自行debug看看