ZooKeeper注册中心实现
具体步骤
- 安装ZooKeeper(启动端口占用,2181:客户端,8080:管理端)
- 引入客户端依赖
- 实现注册中心接口
- SPI补充ZooKeeper注册中心
引入依赖
<!-- zookeeper -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>5.6.0</version>
</dependency>
ZooKeeper注册中心实现
/**
* zookeeper 注册中心
*/
public class ZooKeeperRegistry implements Registry {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperRegistry.class);
private CuratorFramework client;
private ServiceDiscovery<ServiceMetaInfo> serviceDiscovery;
/**
* 本机注册的节点 key 集合(用于维护续期)
*/
private final Set<String> localRegisterNodeKeySet = new HashSet<>();
/**
* 注册中心服务缓存
*/
private final RegistryServiceMultiCache registryServiceMultiCache = new RegistryServiceMultiCache();
/**
* 正在监听的 key 集合
*/
private final Set<String> watchingKeySet = new ConcurrentHashSet<>();
/**
* 根节点
*/
private static final String ZK_ROOT_PATH = "/rpc/zk";
@Override
public void init(RegistryConfig registryConfig) {
// 构建 client 实例
client = CuratorFrameworkFactory
.builder()
.connectString(registryConfig.getAddress())
.retryPolicy(new ExponentialBackoffRetry(Math.toIntExact(registryConfig.getTimeout()), 3))
.build();
// 构建 serviceDiscovery 实例
serviceDiscovery = ServiceDiscoveryBuilder.builder(ServiceMetaInfo.class)
.client(client)
.basePath(ZK_ROOT_PATH)
.serializer(new JsonInstanceSerializer<>(ServiceMetaInfo.class))
.build();
try {
// 启动 client 和 serviceDiscovery
client.start();
serviceDiscovery.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
// 注册到 zk 里
serviceDiscovery.registerService(buildServiceInstance(serviceMetaInfo));
// 添加节点信息到本地缓存
String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
localRegisterNodeKeySet.add(registerKey);
}
@Override
public void unRegister(ServiceMetaInfo serviceMetaInfo) {
try {
serviceDiscovery.unregisterService(buildServiceInstance(serviceMetaInfo));
} catch (Exception e) {
throw new RuntimeException(e);
}
// 从本地缓存移除
String registerKey = ZK_ROOT_PATH + "/" + serviceMetaInfo.getServiceNodeKey();
localRegisterNodeKeySet.remove(registerKey);
}
@Override
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
// 优先从缓存获取服务
List<ServiceMetaInfo> cachedServiceMetaInfoList = registryServiceMultiCache.readCache(serviceKey);
if (cachedServiceMetaInfoList != null) {
return cachedServiceMetaInfoList;
}
try {
// 查询服务信息
Collection<ServiceInstance<ServiceMetaInfo>> serviceInstanceList = serviceDiscovery.queryForInstances(serviceKey);
// 解析服务信息
List<ServiceMetaInfo> serviceMetaInfoList = serviceInstanceList.stream()
.map(ServiceInstance::getPayload)
.collect(Collectors.toList());
// 写入服务缓存
registryServiceMultiCache.writeCache(serviceKey, serviceMetaInfoList);
return serviceMetaInfoList;
} catch (Exception e) {
throw new RuntimeException("获取服务列表失败", e);
}
}
@Override
public void heartBeat() {
// 不需要心跳机制,建立了临时节点,如果服务器故障,则临时节点直接丢失
}
/**
* 监听(消费端)
*
* @param serviceNodeKey 服务节点 key
*/
@Override
public void watch(String serviceNodeKey) {
String watchKey = ZK_ROOT_PATH + "/" + serviceNodeKey;
boolean newWatch = watchingKeySet.add(watchKey);
if (newWatch) {
CuratorCache curatorCache = CuratorCache.build(client, watchKey);
curatorCache.start();
curatorCache.listenable().addListener(
CuratorCacheListener
.builder()
.forDeletes(childData -> registryServiceMultiCache.clearCache(serviceNodeKey))
.forChanges(((oldNode, node) -> registryServiceMultiCache.clearCache(serviceNodeKey)))
.build()
);
}
}
@Override
public void destroy() {
logger.info("zookeeper注册中心下线...");
// 下线节点(这一步可以不做,因为都是临时节点,服务下线,节点就被删掉)
for (String key : localRegisterNodeKeySet) {
try {
client.delete().guaranteed().forPath(key);
} catch (Exception e) {
throw new RuntimeException(key + "节点下线失败");
}
}
// 释放资源
if (client != null) {
client.close();
}
}
private ServiceInstance<ServiceMetaInfo> buildServiceInstance(ServiceMetaInfo serviceMetaInfo) {
String serviceAddress = serviceMetaInfo.getServiceHost() + ":" + serviceMetaInfo.getServicePort();
try {
return ServiceInstance
.<ServiceMetaInfo>builder()
.id(serviceAddress)
.name(serviceMetaInfo.getServiceKey())
.address(serviceAddress)
.payload(serviceMetaInfo)
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
添加SPI配置
etcd=com.starlink.registry.EtcdRegistry
zookeeper=com.starlink.registry.ZooKeeperRegistry
最后配置文件指定ZooKeeper为注册中心即可使用ZooKeeper注册中心
rpc.registryConfig.registry=zookeeper
rpc.registryConfig.address=localhost:2181