微服务-流量染色
1. 功能目的
通过设置请求头的方式将http请求优先打到指定的服务上,为微服务开发调试工作提供便利
- 请求报文难模拟:可以直接在测试环境页面上操作,流量直接打到本地IDEA进行debug
- 请求链路较长:本地开发无需启动所有服务,仅需要启动目标服务
- 协同开发:与其他人一同开发,并且依赖对方开发的接口,可以直接将自己本地服务的请求发到对方本地服务上
这是目前主要的使用目的,当然也可以调整负载均衡逻辑,配合网关的一些自定义配置,扩充为灰度发布的效果
2. 实现原理
通过在网关以及Ribbon,实现自定义的负载均衡策略,将请求引流到本地。
PS:需要服务器能访问本地,需要类似OpenVPN这样赋予本地一个IP,供服务器网关能请求到本地
开启OpenVPN之后本机会有多个IP,通过配置指定注册到nacos上的IP
spring.cloud.nacos.discovery.network-interface: 10.0
2.1 本地服务调整
本地启动时,配置文件添加参数,设置一个元数据作为服务的流量标识
比如mdm服务配置参数
spring.cloud.nacos.discovery.metadata.request-mark: azhuzhu
服务启动之后,我们可以在nacos的服务列表里看到元数据
2.2 网关负载均衡
服务分类:
- 服务器服务:metadata中没有 requestMark 参数
- 本地服务:metadata中带有 requestMark 参数
网关实现自定义负载均衡策略:
- 判断请求头中是否带有本地流量标识:requestMark
- 有标识:判断有无metadata匹配的服务实例
- 有:调用匹配的服务实例
- 无:判断目标请求有没有服务器服务实例
- 有:服务器服务随机数负载
- 无:可用实例随机负载
- 无标识:判断目标请求有没有服务器服务实例
- 有:服务器服务随机数负载
- 无:可用实例随机负载
网关负载处理了第一目标服务,假如调用链路为 mdm -> commom-api,我们启动的是common-api,则需要在服务端的ribbon中做负载
- 有标识:判断有无metadata匹配的服务实例
2.3 请求发起
比如,约定request-mark作为本地流量标识
请求头 request-mark=azhuzhu 表示流量优先打到带有元数据 request-mark=azhuzhu 的服务
- 浏览器操作:通过浏览器插件ModHeader,在浏览器发起请求时,带上请求头
- HTTP工具:带上自定义请求头
3. 具体代码介绍
以下所有注册的bean, 都通过指定的配置参数开启
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
3.1 自定义负载均衡器
网关及其他客户端的流量染色具体的负载逻辑实现
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
/**
* 自定义负载均衡器 用于开发环境的流量染色
*
* @author 阿猪 2024-08-09 11:45
*/
@Slf4j
@Configuration
@SuppressWarnings("deprecation")
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
@LoadBalancerClients(defaultConfiguration = {ReqMarkLoadBalancer.class})
public class ReqMarkLoadBalancer {
public static final String REQUEST_MARK = "request-mark";
/**
* 开启流量染色时 替换默认的负载器
*
* @param environment 环境信息
* @param loadBalancerClientFactory 负载器工厂
* @return 自定义负载器
*/
@Bean
@Primary
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RandomLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
static class RandomLoadBalancer implements ReactorServiceInstanceLoadBalancer {
private final ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final String serviceId;
public RandomLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
String serviceId) {
this.serviceId = serviceId;
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier =
serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
String requestMark = getRequestMark(request);
return supplier.get().next().map(serviceInstances -> processInstanceResponseByReqMark(serviceInstances, requestMark));
}
private String getRequestMark(Request<?> request) {
// 客户端的负载 直接从 RequestContextHolder 拿请求头
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null) {
return attributes.getRequest().getHeader(REQUEST_MARK);
}
// 网关的负载从 request 取值(网关覆盖了默认实现 把context塞进去了 不然拿到是 null 跟spring boot版本有关系)
if (!(request.getContext() instanceof ServerHttpRequest)) {
return null;
}
ServerHttpRequest context = (ServerHttpRequest) request.getContext();
if (context.getHeaders().containsKey(REQUEST_MARK)) {
return context.getHeaders().getFirst(REQUEST_MARK);
}
return null;
}
/**
* 默认的随机数负载
*
* @param instances 可用服务实例
* @return 命中实例
*/
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (CollectionUtils.isEmpty(instances)) {
log.warn("No instance available {}", serviceId);
return new EmptyResponse();
}
Random random = new Random();
ServiceInstance instance = instances.get(random.nextInt(instances.size()));
return new DefaultResponse(instance);
}
private Response<ServiceInstance> processInstanceResponseByReqMark(List<ServiceInstance> instances, String requestMark) {
if (instances.isEmpty()) {
return new EmptyResponse();
}
ServiceInstance sameClusterNameInst = selectInstanceByReqMark(instances, requestMark);
return new DefaultResponse(sameClusterNameInst);
}
private ServiceInstance selectInstanceByReqMark(List<ServiceInstance> instances, String requestMark) {
// 元数据不带请求标识的服务, 标识为服务器上的服务
List<ServiceInstance> serverInstances = instances.stream().filter(instance -> {
Map<String, String> metadata = instance.getMetadata();
return MapUtils.isEmpty(metadata) || !metadata.containsKey(REQUEST_MARK);
}).collect(Collectors.toList());
if (StringUtils.isBlank(requestMark)) {
if (CollectionUtils.isEmpty(serverInstances)) {
return instances.get(new Random().nextInt(instances.size()));
}
return serverInstances.get(new Random().nextInt(serverInstances.size()));
}
List<ServiceInstance> matchInstances = Lists.newArrayList();
for (ServiceInstance instance : instances) {
Map<String, String> metadata = instance.getMetadata();
if (MapUtils.isEmpty(metadata)) {
continue;
}
if (metadata.containsKey(REQUEST_MARK) && requestMark.equals(metadata.get(REQUEST_MARK))) {
matchInstances.add(instance);
}
}
Random random = new Random();
// 优先匹配到的服务 最后是随机
if (CollectionUtils.isNotEmpty(matchInstances)) {
return matchInstances.get(random.nextInt(matchInstances.size()));
}
// 然后是无标识服务(服务器上的服务)
if (CollectionUtils.isNotEmpty(serverInstances)) {
return serverInstances.get(random.nextInt(serverInstances.size()));
}
// 前两者都没有就随机获取
return instances.get(random.nextInt(instances.size()));
}
}
}
3.2 流量标识请求头透传
这里使用Feign进行内部服务调用,需要将原请求的流量标识 请求头继续传递下去,保证后续的服务链路也能有流量染色的效果。
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
/**
* 流量染色 - 流量标识请求头透传
*
* @author 阿猪 2024-09-25 17:11
*/
@Component
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
public class ReqMarkRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
// 从 request 中获取流量标识, 设置到 feign 的 requestTemplate中
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (attributes != null) {
requestTemplate.header(ReqMarkLoadBalancer.REQUEST_MARK, attributes.getRequest().getHeader(ReqMarkLoadBalancer.REQUEST_MARK));
}
}
}
3.3 网关负载均衡-请求信息获取
由于这个方案中,负载均衡是依靠 请求头 判断的,详见上面请求头的获取ReqMarkLoadBalancer.getRequestMark
在spring boot 2.3.2 版本中 request.getContext是个空的,没法获取请求信息
2.6.x 后面没有这个问题,但需要关注下这个context的类型,调整代码
以下是覆盖默认实现,为 request 的 context 设置请求信息。
实际上是复制 ReactiveLoadBalancerClientFilter的源码稍作修改,看倒数最后两行
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.DefaultRequest;
import org.springframework.cloud.client.loadbalancer.reactive.Request;
import org.springframework.cloud.client.loadbalancer.reactive.Response;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.gateway.support.DelegatingServiceInstance;
import org.springframework.cloud.gateway.support.NotFoundException;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.net.URI;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.*;
/**
* 覆盖默认的负载均衡器 改了choose方法 将request信息传入
*
* @see ReactiveLoadBalancerClientFilter
* @author 阿猪 2024-08-09 17:06
*/
@Slf4j
@Component
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
public class CustomLoadBalancerClientFilter extends ReactiveLoadBalancerClientFilter {
private static final int LOAD_BALANCER_CLIENT_FILTER_ORDER = 10150;
private final LoadBalancerClientFactory clientFactory;
private final LoadBalancerProperties properties;
public CustomLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,
LoadBalancerProperties properties) {
super(null, null);
this.clientFactory = clientFactory;
this.properties = properties;
}
@Override
public int getOrder() {
return LOAD_BALANCER_CLIENT_FILTER_ORDER;
}
@Override
@SuppressWarnings("Duplicates")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName()
+ " url before: " + url);
}
return choose(exchange).doOnNext(response -> {
if (!response.hasServer()) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
ServiceInstance retrievedInstance = response.getServer();
URI uri = exchange.getRequest().getURI();
// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(
retrievedInstance, overrideScheme);
URI requestUrl = reconstructURI(serviceInstance, uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
}).then(chain.filter(exchange));
}
@SuppressWarnings("deprecation")
private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {
URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory
.getInstance(uri.getHost(), ReactorServiceInstanceLoadBalancer.class);
if (loadBalancer == null) {
throw new NotFoundException("No loadbalancer available for " + uri.getHost());
}
// 就改了这里 仅调整参数传入 保持原有逻辑(原代码传入了空的request)
Request<?> request = new DefaultRequest<>(exchange.getRequest());
return loadBalancer.choose(request);
}
}
将bean覆盖掉,替换掉原本的 ReactiveLoadBalancerClientFilter
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.gateway.config.LoadBalancerProperties;
import org.springframework.cloud.gateway.filter.ReactiveLoadBalancerClientFilter;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 流量染色 - 将ReactiveLoadBalancerClientFilter覆盖掉 为了获取到http请求头
*
* @author 阿猪 2024-08-09 17:12
*/
@Configuration
public class CustomLoadBalancerConfig {
@Bean
@ConditionalOnProperty(name = "hg.request-mark.enable", havingValue = "true")
public ReactiveLoadBalancerClientFilter gatewayLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory,
LoadBalancerProperties properties){
return new CustomLoadBalancerClientFilter(clientFactory, properties);
}
}