当前位置: 首页 > article >正文

springboot、springcloudnacos、netty-socketio实现im集群弹性伸缩和节点上下线监听

1、im-server 所有节点都注册到nacos服务中,使用nacos服务端2.4.3,客户端1.4.6,

spring-cloud-starter-alibaba-nacos-discovery版本  2021.1

2、im-listener 监听 im-server的上线和下线事件

3、springcloudalibaba  nacos监听服务上线和下线

配置文件

spring:
  redis:
    redisson:
      file:
        classpath:redisson.yaml
  application:
    # 应用名称
    name: im-listener
  cloud:
    nacos:
      discovery:
        # 服务注册地址
        server-addr: xxx.xxx.xxx.xx:xx
        namespace: cb329a1e-c20b-495a-885f-72076fc90d5f
        #心跳间隔。时间单位:毫秒。
        heart-beat-interval: 1000
        #心跳暂停。时间单位:毫秒。 即nacos服务端40秒收不到微服务客户端心跳,会将该微服务客户端注册的实例设为不健康
        heart-beat-timeout: 4000
        #Ip删除超时。时间单位:秒。即服务端90秒收不到客户端心跳,会将该微服务客户端注册的实例删除
        ip-delete-timeout: 9000
        #nacos 账号
        username: XXXXXXXXX
        # nacos 密码
        password: YYYYYYYYYY
        register-enabled: false # 注意:该服务无需注册到注册中心上,只用于获取注册中心上的服务信息就行了
    config:
      # 相同配置,本地优先
      override-none: true



server:
  port: 8080

im:
  port: 9092


代码

package com.yh.im.config;

import com.alibaba.cloud.nacos.NacosServiceManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
 * nacos 客户端 服务监听变化。当服务下线和上线的时候能够收到通知
 * @Date 2023/11/27 12:00
 */
@Component
@Slf4j
public class NacosDiscoveryListener {
    private final Set<String> subscribedServices = ConcurrentHashMap.newKeySet();
    @Resource
    private NacosServiceManager nacosServiceManager;
    private static Map<String, Map<String, Boolean>> instanceHealthStatus = new ConcurrentHashMap<>();


    /**
     * 构造一个事件监听器,主要作用是监听服务实例变化
     *
     * @return EventListener
     */
    private EventListener buildEventListener() {
        return event -> {
            if (event instanceof NamingEvent) {
                NamingEvent namingEvent = (NamingEvent) event;
                log.error("服务实例变化:{}", JSON.toJSONString(namingEvent));
                String serviceName = namingEvent.getServiceName();
                if (!instanceHealthStatus.containsKey(serviceName)) {
                    ConcurrentHashMap<String, Boolean> instanceMap = new ConcurrentHashMap<>();
                    instanceHealthStatus.put(serviceName, instanceMap);
                    List<Instance> newInstance = namingEvent.getInstances();
                    newInstance.forEach(instance -> {
                        String instanceKey = instance.getIp() + ":" + instance.getPort();
                        instanceMap.put(instanceKey, instance.isHealthy());
                        log.error("服务首次上线: {} -> {}", serviceName, instanceKey);
                    });
                    return;
                }
                List<ServiceInfo> allServiceInstances = getAllServiceInstances();
                int instanceTotal = allServiceInstances.stream()
                        .mapToInt(serviceInfo -> Integer.parseInt(serviceInfo.getClusters()))
                        .sum();
                Map<String, Boolean> serviceMap = instanceHealthStatus.computeIfAbsent(serviceName, k -> new ConcurrentHashMap<>());
                Set<String> oldInstanceKeys = new HashSet<>(serviceMap.keySet());
                List<Instance> newInstance = namingEvent.getInstances();
                Set<String> newInstanceKeys = newInstance.stream()
                        .map(instance -> instance.getIp() + ":" + instance.getPort())
                        .collect(Collectors.toSet());
                int oldSize = serviceMap.size();
                int newSize = namingEvent.getInstances().size();
                // 服务实例没有增减,只是状态变化
                if (oldSize == newSize) {
                    newInstance.forEach((instance) -> {
                        String instanceKey = instance.getIp() + ":" + instance.getPort();
                        if (instance.isHealthy() != serviceMap.get(instanceKey)) {
                            if (instance.isHealthy()) {
                                log.error("服务上线: {} -> {}", serviceName, instanceKey);

                            } else {
                                log.error("服务下线: {} -> {}", serviceName, instanceKey);

                            }
                            serviceMap.put(instanceKey, instance.isHealthy());
                        }
                    });
                }
                // 下线实例
                if (oldSize > newSize) {
                    newInstanceKeys.forEach(oldInstanceKeys::remove);
                    oldInstanceKeys.forEach(instanceKey -> log.error("服务下线: {} -> {}", serviceName, instanceKey));
                } else {
                    // 上线实例
                    newInstanceKeys.removeAll(oldInstanceKeys);
                    StringBuffer noticeTitle = new StringBuffer("服务上线通知");
                    newInstanceKeys.forEach(instanceKey -> {
                        String message = String.format("[%s-%s]", serviceName, instanceKey);
                        log.info(message);
                        noticeTitle.append(message).append(",");
                    });

                }
            }
        };
    }
    /**
     * 定时获取服务列表,然后根据获取到的服务名,进行订阅,
     * nacos客户端目前不能订阅所有服务,只能手动的订阅
     * 也可以不用定时需要的时候通过getAllServiceInstances获取
     */
    @Scheduled(fixedDelay = 5000)
    public void reportServices() {
        List<String> services = null;
        try {
            Properties properties = new Properties();
            NamingService namingService = nacosServiceManager.getNamingService(properties);

            services = namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData();
            services.forEach(serviceName -> {
                if (!subscribedServices.contains(serviceName)) {
                    try {
                        namingService.subscribe(serviceName, buildEventListener());
                        subscribedServices.add(serviceName);
                    } catch (NacosException e) {
                        log.error("订阅服务失败", e);
                    }
                }
            });
        } catch (NacosException e) {
            log.error("获取服务列表失败", e);
        }
    }

    /**
     * 获取所有服务实例
     * @return  服务实例列表
     */
    public List<ServiceInfo> getAllServiceInstances() {
        List<ServiceInfo> serviceInfos = new ArrayList<>();
        try {
            Properties properties = new Properties();
            NamingService namingService = nacosServiceManager.getNamingService(properties);
            List<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData();
            for (String serviceName : services) {
                List<Instance> onlineInstances = namingService.selectInstances(serviceName, true);
                // 下线服务暂时不用关注
                List<Instance> offlineInstances = namingService.selectInstances(serviceName, false);
                serviceInfos.add(new ServiceInfo(serviceName, String.valueOf(onlineInstances.size())));
            }
        } catch (NacosException e) {
            e.printStackTrace();
        }
        return serviceInfos;
    }

}


http://www.kler.cn/a/451485.html

相关文章:

  • 三维动画的常用“视觉特效”有哪些?
  • 【ES6复习笔记】Symbol 类型及其应用(9)
  • [Android]按下扫描键时启动一个线程来执行某些操作
  • 大语言模型学习工具及资源总结和落地应用
  • 网络安全攻防演练中的常见计策
  • 重温设计模式--2、设计模式七大原则
  • 工业相机镜头选型知识详解
  • 学习笔记(prism--视频【WPF-prism核心教程】)--待更新
  • 突围边缘:OpenAI开源实时嵌入式API,AI触角延伸至微观世界
  • Spark和Hadoop之间的区别
  • 后端接口返回文件流,前端下载(java+vue)
  • 特殊的“Undefined Reference xxx“编译错误
  • Rust 在前端基建中的使用
  • 深度学习在灾难恢复中的作用:智能运维的新时代
  • 【数据结构】数据结构整体大纲
  • 面试题整理18----Pause容器的用途
  • 代码随想录 day52 第十一章 图论part03
  • 医疗行业 UI 设计系列合集(一):精准定位
  • 【AI驱动的数据结构:包装类的艺术与科学】
  • 如何学习Trustzone
  • Linux下载RabbitMQ,并解决Github拒绝访问443的问题
  • 【仓颉语言体验】Hello World TCP客户端 C/C++ or Python
  • ResEmoteNet论文阅读与推理
  • 【可视化开源性能压测工具】小巧而强大的oha
  • 【数据结构2】线性表——顺序表
  • 动态规划:石子合并 图文+举例超详细说明