springboot、springcloudnacos、netty-socketio实现im集群弹性伸缩和节点上下线监听
1、im-server 所有节点都注册到nacos服务中,使用nacos服务端2.4.3,客户端1.4.6,
spring-cloud-starter-alibaba-nacos-discovery版本 2021.1
2、im-listener 监听 im-server的上线和下线事件
3、springcloudalibaba nacos监听服务上线和下线
配置文件
spring:
redis:
redisson:
file:
classpath:redisson.yaml
application:
# 应用名称
name: im-listener
cloud:
nacos:
discovery:
# 服务注册地址
server-addr: xxx.xxx.xxx.xx:xx
namespace: cb329a1e-c20b-495a-885f-72076fc90d5f
#心跳间隔。时间单位:毫秒。
heart-beat-interval: 1000
#心跳暂停。时间单位:毫秒。 即nacos服务端40秒收不到微服务客户端心跳,会将该微服务客户端注册的实例设为不健康
heart-beat-timeout: 4000
#Ip删除超时。时间单位:秒。即服务端90秒收不到客户端心跳,会将该微服务客户端注册的实例删除
ip-delete-timeout: 9000
#nacos 账号
username: XXXXXXXXX
# nacos 密码
password: YYYYYYYYYY
register-enabled: false # 注意:该服务无需注册到注册中心上,只用于获取注册中心上的服务信息就行了
config:
# 相同配置,本地优先
override-none: true
server:
port: 8080
im:
port: 9092
代码
package com.yh.im.config;
import com.alibaba.cloud.nacos.NacosServiceManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* nacos 客户端 服务监听变化。当服务下线和上线的时候能够收到通知
* @Date 2023/11/27 12:00
*/
@Component
@Slf4j
public class NacosDiscoveryListener {
private final Set<String> subscribedServices = ConcurrentHashMap.newKeySet();
@Resource
private NacosServiceManager nacosServiceManager;
private static Map<String, Map<String, Boolean>> instanceHealthStatus = new ConcurrentHashMap<>();
/**
* 构造一个事件监听器,主要作用是监听服务实例变化
*
* @return EventListener
*/
private EventListener buildEventListener() {
return event -> {
if (event instanceof NamingEvent) {
NamingEvent namingEvent = (NamingEvent) event;
log.error("服务实例变化:{}", JSON.toJSONString(namingEvent));
String serviceName = namingEvent.getServiceName();
if (!instanceHealthStatus.containsKey(serviceName)) {
ConcurrentHashMap<String, Boolean> instanceMap = new ConcurrentHashMap<>();
instanceHealthStatus.put(serviceName, instanceMap);
List<Instance> newInstance = namingEvent.getInstances();
newInstance.forEach(instance -> {
String instanceKey = instance.getIp() + ":" + instance.getPort();
instanceMap.put(instanceKey, instance.isHealthy());
log.error("服务首次上线: {} -> {}", serviceName, instanceKey);
});
return;
}
List<ServiceInfo> allServiceInstances = getAllServiceInstances();
int instanceTotal = allServiceInstances.stream()
.mapToInt(serviceInfo -> Integer.parseInt(serviceInfo.getClusters()))
.sum();
Map<String, Boolean> serviceMap = instanceHealthStatus.computeIfAbsent(serviceName, k -> new ConcurrentHashMap<>());
Set<String> oldInstanceKeys = new HashSet<>(serviceMap.keySet());
List<Instance> newInstance = namingEvent.getInstances();
Set<String> newInstanceKeys = newInstance.stream()
.map(instance -> instance.getIp() + ":" + instance.getPort())
.collect(Collectors.toSet());
int oldSize = serviceMap.size();
int newSize = namingEvent.getInstances().size();
// 服务实例没有增减,只是状态变化
if (oldSize == newSize) {
newInstance.forEach((instance) -> {
String instanceKey = instance.getIp() + ":" + instance.getPort();
if (instance.isHealthy() != serviceMap.get(instanceKey)) {
if (instance.isHealthy()) {
log.error("服务上线: {} -> {}", serviceName, instanceKey);
} else {
log.error("服务下线: {} -> {}", serviceName, instanceKey);
}
serviceMap.put(instanceKey, instance.isHealthy());
}
});
}
// 下线实例
if (oldSize > newSize) {
newInstanceKeys.forEach(oldInstanceKeys::remove);
oldInstanceKeys.forEach(instanceKey -> log.error("服务下线: {} -> {}", serviceName, instanceKey));
} else {
// 上线实例
newInstanceKeys.removeAll(oldInstanceKeys);
StringBuffer noticeTitle = new StringBuffer("服务上线通知");
newInstanceKeys.forEach(instanceKey -> {
String message = String.format("[%s-%s]", serviceName, instanceKey);
log.info(message);
noticeTitle.append(message).append(",");
});
}
}
};
}
/**
* 定时获取服务列表,然后根据获取到的服务名,进行订阅,
* nacos客户端目前不能订阅所有服务,只能手动的订阅
* 也可以不用定时需要的时候通过getAllServiceInstances获取
*/
@Scheduled(fixedDelay = 5000)
public void reportServices() {
List<String> services = null;
try {
Properties properties = new Properties();
NamingService namingService = nacosServiceManager.getNamingService(properties);
services = namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData();
services.forEach(serviceName -> {
if (!subscribedServices.contains(serviceName)) {
try {
namingService.subscribe(serviceName, buildEventListener());
subscribedServices.add(serviceName);
} catch (NacosException e) {
log.error("订阅服务失败", e);
}
}
});
} catch (NacosException e) {
log.error("获取服务列表失败", e);
}
}
/**
* 获取所有服务实例
* @return 服务实例列表
*/
public List<ServiceInfo> getAllServiceInstances() {
List<ServiceInfo> serviceInfos = new ArrayList<>();
try {
Properties properties = new Properties();
NamingService namingService = nacosServiceManager.getNamingService(properties);
List<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData();
for (String serviceName : services) {
List<Instance> onlineInstances = namingService.selectInstances(serviceName, true);
// 下线服务暂时不用关注
List<Instance> offlineInstances = namingService.selectInstances(serviceName, false);
serviceInfos.add(new ServiceInfo(serviceName, String.valueOf(onlineInstances.size())));
}
} catch (NacosException e) {
e.printStackTrace();
}
return serviceInfos;
}
}