分布式锁实现机制
介绍说明
解决思路
分布式场景下之所以不能够使用并发包下的锁解决并发问题,那是因为多节点是每个应用都有相互独立的进程,他们没有共享内存资源内存因此很难控制并发。
要想控制分布式应用并发问题
常用的分布式锁实现策略
1、基于数据库(mysql)的表锁(比如对某个插入字段orderId作唯一索引约束)或增加一个版本号字段乐观锁实现
基于mysql锁表(唯一索引)
表结构如下
CREATE TABLE `order_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `order_id` int(11) DEFAULT NULL, `insert_time` datetime DEFAULT NULL, `order_type` varchar(10) DEFAULT NULL, `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `order_id` (`order_id`)//对order_id建立唯一索引 ) ENGINE=InnoDB DEFAULT CHARSET=utf8; //当多节点分布式场景下对同一个订单执行插入操作时,就可以利用数据库服务器提供的唯一索引来控制住该场景下的并发问题。
基于mysql的乐观锁,比如在表中增加一个version版本号来控制
表结构
CREATE TABLE `order_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `order_id` int(11) DEFAULT NULL, `insert_time` datetime DEFAULT NULL, `order_type` varchar(10) DEFAULT NULL, `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP, `version` int(11) DEFAULT NULL,//根据这个版本号来判断更新之前有没有其它线程对它更新过 PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
示例代码
//1、先查询出该记录对应的版本号 int versionCode=jdbc.selectVersion(sql); //2、执行更新操作 //sql大概是这样的 update order_info set version=versionCode+1 where version=versionCode; jdbc.updateOrderInfo(sql); //如果update能够影响一条语句,那么说明它是占到资源更新成功,如果没有那么表示资源已经被别的线程占用了。 //每次插入或更新的时候先检查查出的version是否发生了变化。
备注:
2、基于内存服务器的原子性操作或compare and set(检查比较再赋值),比如memcached/redis缓存服务器实现
示例代码
memcached实现的锁工具类
/** * 尝试获得锁资源 * * @param id * @return */ public static boolean tryLock(String id) { MemcachedClient memcachedClient = MemcachedUtils.getMemcachedClient(); try { return memcachedClient.add(id, 100, "value"); //最好给锁资源设置一个有效时间 } catch (Exception e) { e.printStackTrace(); } return false; } /** * 尝试释放锁资源 * * @param id * @return */ public static boolean releaseLock(String id) { MemcachedClient memcachedClient = MemcachedUtils.getMemcachedClient(); try { return memcachedClient.delete(id); } catch (Exception e) { e.printStackTrace(); } return false; }
使用方式AppMain
String id = "xxx"; try { boolean getLock = MemcachedLock.tryLock(id); if (getLock) {//如果获得锁资源 // doSomething()... } } finally { MemcachedLock.releaseLock(id); //不论执行业务执行成功与否都要释放资源 } //使用锁一定要将代码块放入try..catch..finally代码块中
redis等实现分布式锁大致相同。
备注:
3、基于消息通信的协调服务的zookeeper实现
示例代码:
/** * 分布式锁方案实现 * Meilele.com Inc. * Copyright (c) 2008-2015 All Rights Reserved. * @author xuyi3 * @version $Id: DistributedSharedLock.java, v 0.1 2015年12月21日 下午1:46:47 xuyi3 Exp $ */ public class DistributedSharedLock implements Watcher { /** SLF4J日志*/ private static final Logger logger = LoggerFactory.getLogger(DistributedSharedLock.class); /** 连接zookeeper的服务器地址 */ private static final String ADDR = Configure.ZOO_KEEPER_XS; /** 创建子节点名称*/ private static final String LOCK_NODE = Configure.LOCK_NODE; /** 锁目录名称*/ private String rootLockNode = Configure.LOCK_PATH; // 锁目录 /** zookeeper对象*/ private ZooKeeper zk = null; /** 互斥标识*/ private Integer mutex; /** 当前锁标识*/ private Integer currentLock; /** * 构造函数实现 连接zk服务器 创建zk锁目录 * * @param rootLockNode */ public DistributedSharedLock() { try { // 连接zk服务器 zk = new ZooKeeper(ADDR, Configure.TIME_OUT, this); } catch (IOException e) { e.printStackTrace(); } mutex = new Integer(-1); if (zk != null) { try { // 建立根目录节点 Stat s = zk.exists(rootLockNode, false); if (s == null) { zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (KeeperException e) { logger.info("Keeper exception when instantiating queue:{}", e.toString()); } catch (InterruptedException e) { logger.info("InterruptedException{}", e.toString()); } } } /** * 请求zk服务器,获得锁 * 基于创建临时节点,节点最小的获得锁持有权的算法。 * @throws KeeperException * @throws InterruptedException */ public void acquire() throws KeeperException, InterruptedException { ByteBuffer b = ByteBuffer.allocate(4); byte[] value; b.putInt(ThreadLocalRandom.current().nextInt(10)); value = b.array(); // 创建锁节点 String lockName = zk.create(rootLockNode + "/" + LOCK_NODE, value, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); synchronized (mutex) { while (true) { // 获得当前锁节点的number,和所有的锁节点比较 Integer acquireLock = new Integer(lockName.substring(lockName.lastIndexOf('-') + 1)); List<String> childLockNode = zk.getChildren(rootLockNode, true); //将创建的子节点放入TreeSet中进行排序 SortedSet<Integer> sortedLock = new TreeSet<Integer>(); for (String temp : childLockNode) { Integer tempLockNumber = new Integer(temp.substring(temp.lastIndexOf('-') + 1)); sortedLock.add(tempLockNumber); } //获取最小节点,创建该节点的连接持有锁资源。 currentLock = sortedLock.first(); // 如果当前创建的锁的序号是最小的那么认为这个客户端获得了锁 if (currentLock >= acquireLock) { return; } else { // 没有获得锁则等待下次事件的发生 mutex.wait(); } } } } /** * 释放锁 * * @throws KeeperException * @throws InterruptedException */ public void release() throws KeeperException, InterruptedException { String lockName = String.format("%010d", currentLock); zk.delete(rootLockNode + "/" + LOCK_NODE + lockName, -1); //释放锁的时候应该主动关闭和zookeeper的连接 if (zk.getState() == States.CONNECTED) { zk.close(); } } /** * 监听器必须实现的方法 * */ public void process(WatchedEvent arg0) { synchronized (mutex) { mutex.notifyAll(); } } } //是基于zookeeper临时节点实现的分布式锁。
备注:
转载自:http://blog.csdn.net/nicewuranran/article/details/51730131