Zookeeper实现分布式锁、Zookeeper实现配置中心
一、Zookeeper实现分布式锁
分布式锁主要用于在分布式环境中保证数据的一致性。
包括跨进程、跨机器、跨网络导致共享资源不一致的问题。
1.Zookeeper分布式锁的代码实现
新建一个maven项目ZK-Demo,然后在pom.xml里面引入相关的依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
2. Zookeeper分布式锁的核心代码实现
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @Description: Zookeeper分布式锁的核心代码实现
* @author leeSmall
* @date 2018年9月4日
*
*/
public class DistributedLock implements Lock {
private static Logger logger = LoggerFactory.getLogger(DistributedLock.class);
private static final String ZOOKEEPER_IP_PORT = "192.168.152.130:2181";
private static final String LOCK_PATH = "/LOCK";
private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT, 4000, 4000, new SerializableSerializer());
private CountDownLatch cdl;
private String beforePath;// 当前请求的节点前一个节点
private String currentPath;// 当前请求的节点
// 判断有没有LOCK目录,没有则创建
public DistributedLock() {
if (!this.client.exists(LOCK_PATH)) {
this.client.createPersistent(LOCK_PATH);
}
}
public void lock() {
//尝试去获取分布式锁失败
if (!tryLock()) {
//对次小节点进行监听
waitForLock();
lock();
}
else {
logger.info(Thread.currentThread().getName() + " 获得分布式锁!");
}
}
public boolean tryLock() {
// 如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
if (currentPath == null || currentPath.length() <= 0) {
// 创建一个临时顺序节点
currentPath = this.client.createEphemeralSequential(LOCK_PATH + '/', "lock");
System.out.println("---------------------------->" + currentPath);
}
// 获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
List<String> childrens = this.client.getChildren(LOCK_PATH);
//由小到大排序所有子节点
Collections.sort(childrens);
//判断创建的子节点/LOCK/Node-n是否最小,即currentPath,如果当前节点等于childrens中的最小的一个就占用锁
if (currentPath.equals(LOCK_PATH + '/' + childrens.get(0))) {
return true;
}
//找出比创建的临时顺序节子节点/LOCK/Node-n次小的节点,并赋值给beforePath
else {
int wz = Collections.binarySearch(childrens, currentPath.substring(6));
beforePath = LOCK_PATH + '/' + childrens.get(wz - 1);
}
return false;
}
//等待锁,对次小节点进行监听
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
logger.info(Thread.currentThread().getName() + ":捕获到DataDelete事件!---------------------------");
if (cdl != null) {
cdl.countDown();
}
}
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
// 对次小节点进行监听,即beforePath-给排在前面的的节点增加数据删除的watcher
this.client.subscribeDataChanges(beforePath, listener);
if (this.client.exists(beforePath)) {
cdl = new CountDownLatch(1);
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.client.unsubscribeDataChanges(beforePath, listener);
}
//完成业务逻辑以后释放锁
public void unlock() {
// 删除当前临时节点
client.delete(currentPath);
}
// ==========================================
public void lockInterruptibly() throws InterruptedException {
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
public Condition newCondition() {
return null;
}
}
3.2 在业务里面使用分布式锁
package com.study.demo.lock;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @Description: 在业务里面使用分布式锁
* @author leeSmall
* @date 2018年9月4日
*
*/
public class OrderServiceImpl implements Runnable {
private static OrderCodeGenerator ong = new OrderCodeGenerator();
private Logger logger = LoggerFactory.getLogger(OrderServiceImpl.class);
// 同时并发的线程数
private static final int NUM = 10;
// 按照线程数初始化倒计数器,倒计数器
private static CountDownLatch cdl = new CountDownLatch(NUM);
private Lock lock = new DistributedLock();
// 创建订单接口
public void createOrder() {
String orderCode = null;
//准备获取锁
lock.lock();
try {
// 获取订单编号
orderCode = ong.getOrderCode();
} catch (Exception e) {
// TODO: handle exception
} finally {
//完成业务逻辑以后释放锁
lock.unlock();
}
// ……业务代码
logger.info("insert into DB使用id:=======================>" + orderCode);
}
public void run() {
try {
// 等待其他线程初始化
cdl.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// 创建订单
createOrder();
}
public static void main(String[] args) {
for (int i = 1; i <= NUM; i++) {
// 按照线程数迭代实例化线程
new Thread(new OrderServiceImpl()).start();
// 创建一个线程,倒计数器减1
cdl.countDown();
}
}
}
工具类:
import java.text.SimpleDateFormat;
import java.util.Date;
public class OrderCodeGenerator {
// 自增长序列
private static int i = 0;
// 按照“年-月-日-小时-分钟-秒-自增长序列”的规则生成订单编号
public String getOrderCode() {
Date now = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
return sdf.format(now) + ++i;
}
}
二.新建一个zookeeper配置中心类,从zookeeper动态获取数据库配置
import java.util.List;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.web.context.ContextLoader;
import org.springframework.web.context.WebApplicationContext;
import com.zaxxer.hikari.HikariDataSource;
/**
*
* @Description: zookeeper配置中心类,从zookeeper动态获取数据库配置
* @author leeSmall
* @date 2018年9月10日
*
*/
public class ZookeeperConfigurerCentral {
//curator客户端
private CuratorFramework zkClient;
//curator事件监听
private TreeCache treeCache;
//zookeeper的ip和端口
private String zkServers;
//zookeeper上的/Jdbc路径
private String zkPath;
//超时设置
private int sessionTimeout;
//读取zookeeper上的数据库配置文件放到这里
private Properties props;
public ZookeeperConfigurerCentral(String zkServers, String zkPath, int sessionTimeout) {
this.zkServers = zkServers;
this.zkPath = zkPath;
this.sessionTimeout = sessionTimeout;
this.props = new Properties();
//初始化curator客户端
initZkClient();
//从zookeeper的Jdbc节点下获取数据库配置存入props
getConfigData();
//对zookeeper上的数据库配置文件所在节点进行监听,如果有改变就动态刷新props
addZkListener();
}
//初始化curator客户端
private void initZkClient() {
zkClient = CuratorFrameworkFactory.builder().connectString(zkServers).sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
zkClient.start();
}
//从zookeeper的Jdbc节点下获取数据库配置存入props
private void getConfigData() {
try {
List<String> list = zkClient.getChildren().forPath(zkPath);
for (String key : list) {
String value = new String(zkClient.getData().forPath(zkPath + "/" + key));
if (value != null && value.length() > 0) {
props.put(key, value);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//对zookeeper上的数据库配置文件所在节点进行监听,如果有改变就动态刷新props
private void addZkListener() {
TreeCacheListener listener = new TreeCacheListener() {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
if (event.getType() == TreeCacheEvent.Type.NODE_UPDATED) {
getConfigData();
WebApplicationContext ctx = ContextLoader.getCurrentWebApplicationContext();
HikariDataSource dataSource = (HikariDataSource) ctx.getBean("dataSource");
System.out.println("================"+props.getProperty("url"));
dataSource.setJdbcUrl(props.getProperty("url"));
dataSource.setUsername(props.getProperty("uname"));
dataSource.setPassword(props.getProperty("password "));
dataSource.setDriverClassName(props.getProperty("driver "));
}
}
};
treeCache = new TreeCache(zkClient, zkPath);
try {
treeCache.start();
treeCache.getListenable().addListener(listener);
} catch (Exception e) {
e.printStackTrace();
}
}
public Properties getProps() {
return props;
}
public void setZkServers(String zkServers) {
this.zkServers = zkServers;
}
public void setZkPath(String zkPath) {
this.zkPath = zkPath;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
}
新建一个加载props里面的数据库配置的类
import java.util.Properties;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
/**
*
* @Description: 加载props里面的数据库配置,这个类等价于以前在xml文件里面的配置:
* <context:property-placeholder location="classpath:config/jdbc_conf.properties"/>
* @author leeSmall
* @date 2018年9月10日
*
*/
public class ZookeeperPlaceholderConfigurer extends PropertyPlaceholderConfigurer {
private ZookeeperConfigurerCentral zkConfigurerCentral;
@Override
protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)
throws BeansException {
System.out.println(zkConfigurerCentral.getProps());
super.processProperties(beanFactoryToProcess, zkConfigurerCentral.getProps());
}
public void setzkConfigurerCentral(ZookeeperConfigurerCentral zkConfigurerCentral) {
this.zkConfigurerCentral = zkConfigurerCentral;
}
}