当前位置: 首页 > article >正文

ZooKeeper注册中心实现

具体步骤

  1. 安装ZooKeeper(启动端口占用,2181:客户端,8080:管理端)
  2. 引入客户端依赖
  3. 实现注册中心接口
  4. SPI补充ZooKeeper注册中心

引入依赖

<!-- zookeeper -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>5.6.0</version>
</dependency>

ZooKeeper注册中心实现

/**
 * zookeeper 注册中心
 */
public class ZooKeeperRegistry implements Registry {

    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperRegistry.class);

    private CuratorFramework client;

    private ServiceDiscovery<ServiceMetaInfo> serviceDiscovery;

    /**
     * 本机注册的节点 key 集合(用于维护续期)
     */
    private final Set<String> localRegisterNodeKeySet = new HashSet<>();

    /**
     * 注册中心服务缓存
     */
    private final RegistryServiceMultiCache registryServiceMultiCache = new RegistryServiceMultiCache();

    /**
     * 正在监听的 key 集合
     */
    private final Set<String> watchingKeySet = new ConcurrentHashSet<>();

    /**
     * 根节点
     */
    private static final String ZK_ROOT_PATH = "/rpc/zk";

    @Override
    public void init(RegistryConfig registryConfig) {
        // 构建 client 实例
        client = CuratorFrameworkFactory
                .builder()
                .connectString(registryConfig.getAddress())
                .retryPolicy(new ExponentialBackoffRetry(Math.toIntExact(registryConfig.getTimeout()), 3))
                .build();

        // 构建 serviceDiscovery 实例
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaInfo.class)
                .client(client)
                .basePath(ZK_ROOT_PATH)
                .serializer(new JsonInstanceSerializer<>(ServiceMetaInfo.class))
                .build();

        try {
            // 启动 client 和 serviceDiscovery
            client.start();
            serviceDiscovery.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
        // 注册到 zk 里
        serviceDiscovery.registerService(buildServiceInstance(serviceMetaInfo));

        // 添加节点信息到本地缓存
        String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
        localRegisterNodeKeySet.add(registerKey);
    }

    @Override
    public void unRegister(ServiceMetaInfo serviceMetaInfo) {
        try {
            serviceDiscovery.unregisterService(buildServiceInstance(serviceMetaInfo));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        // 从本地缓存移除
        String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
        localRegisterNodeKeySet.remove(registerKey);
    }

    @Override
    public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
        // 优先从缓存获取服务
        List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceMultiCache.readCache(serviceKey);
        if (cachedServiceMetaInfoList != null) {
            return cachedServiceMetaInfoList;
        }

        try {
            // 查询服务信息
            Collection<ServiceInstance<ServiceMetaInfo>> serviceInstanceList = serviceDiscovery.queryForInstances(serviceKey);

            // 解析服务信息
            List<ServiceMetaInfo> serviceMetaInfoList = serviceInstanceList.stream()
                    .map(ServiceInstance::getPayload)
                    .collect(Collectors.toList());

            // 写入服务缓存
            registryServiceMultiCache.writeCache(serviceKey, serviceMetaInfoList);
            return serviceMetaInfoList;
        } catch (Exception e) {
            throw new RuntimeException("获取服务列表失败", e);
        }
    }

    @Override
    public void heartBeat() {
        // 不需要心跳机制,建立了临时节点,如果服务器故障,则临时节点直接丢失
    }

    /**
     * 监听(消费端)
     *
     * @param serviceNodeKey 服务节点 key
     */
    @Override
    public void watch(String serviceNodeKey) {
        String watchKey = ZK_ROOT_PATH + "/" + serviceNodeKey;
        boolean newWatch = watchingKeySet.add(watchKey);
        if (newWatch) {
            CuratorCache curatorCache = CuratorCache.build(client, watchKey);
            curatorCache.start();
            curatorCache.listenable().addListener(
                    CuratorCacheListener
                            .builder()
                            .forDeletes(childData -> registryServiceMultiCache.clearCache(serviceNodeKey))
                            .forChanges(((oldNode, node) -> registryServiceMultiCache.clearCache(serviceNodeKey)))
                            .build()
            );
        }
    }

    @Override
    public void destroy() {
        logger.info("zookeeper注册中心下线...");
        // 下线节点(这一步可以不做,因为都是临时节点,服务下线,节点就被删掉)
        for (String key : localRegisterNodeKeySet) {
            try {
                client.delete().guaranteed().forPath(key);
            } catch (Exception e) {
                throw new RuntimeException(key + "节点下线失败");
            }
        }

        // 释放资源
        if (client != null) {
            client.close();
        }
    }

    private ServiceInstance<ServiceMetaInfo> buildServiceInstance(ServiceMetaInfo serviceMetaInfo) {
        String serviceAddress = serviceMetaInfo.getServiceHost() + ":" + serviceMetaInfo.getServicePort();
        try {
            return ServiceInstance
                    .<ServiceMetaInfo>builder()
                    .id(serviceAddress)
                    .name(serviceMetaInfo.getServiceKey())
                    .address(serviceAddress)
                    .payload(serviceMetaInfo)
                    .build();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

添加SPI配置

etcd=com.starlink.registry.EtcdRegistry
zookeeper=com.starlink.registry.ZooKeeperRegistry

image-20241229170512331

最后配置文件指定ZooKeeper为注册中心即可使用ZooKeeper注册中心

rpc.registryConfig.registry=zookeeper
rpc.registryConfig.address=localhost:2181

http://www.kler.cn/a/459192.html

相关文章:

  • Acwing 多重背包板子
  • 【Compose multiplatform教程25】拖放操作
  • 41.3 将重查询记录增量更新到consul和redis中
  • Flink operator实现自动扩缩容
  • CORS:跨域访问、如何在Nginx中配置允许跨域访问
  • STM32 SPI读取SD卡
  • 使用 ASP.NET Core wwwroot 上传和存储文件
  • MySQL内存分析常用语句
  • 基本算法——聚类
  • 基于eBPF的微服务网络安全(Cilium 1)
  • spring-boot 日志配置的几种方式
  • 【每日学点鸿蒙知识】Shape描述、全局loading组件、checkbox样式、H5监听键盘收起、弹窗不关闭
  • 利用JavaScript实现猜数字
  • k8S-foundation-_ label、ns
  • Java设计模式 —— 【行为型模式】模板方法模式(Template Method Pattern) 详解
  • EasyExcel(环境搭建以及常用写入操作)
  • 智能工厂的设计软件 应用场景的一个例子:为AI聊天工具添加一个知识系统 之8 重新开始 之1
  • 如何在 Ubuntu 22.04 上添加 Swap 内存
  • nginx中try_files $uri $uri index.html的作用 和 $uri的含义
  • 【每日学点鸿蒙知识】PersistentStorage持久化、插槽方法、相对布局、上拉加载下拉刷新、List联动滑动
  • 【GO基础学习】Gin 框架中间件的详解
  • Error: The Calculated NPWS= 84330 != The Read NPWS= 84328
  • 优化租赁小程序提升服务效率与用户体验的策略与实践
  • 代码随想录算法训练营第六天 | 242. 有效的字母异位词、349. 两个数组的交集、202. 快乐数、1. 两数之和
  • 如何利用无线路由器实现水泵房远程监测管理
  • 关于科研工具的思考