作者微信 bishe2022

代码功能演示视频在页面下方,请先观看;如需定制开发,联系页面右侧客服
Java实现基于Redis的分布式锁

Custom Tab

 单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式。其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去这里看看原理,我就不翻译了,免得变味了,看懂了之后看代码应该就容易理解了。

 

时间统一问题:各个客户端加锁时需要获取时间,而这个时间都不应当从本地获取,因为各个客户端的时间并不是一致的,因此需要提供一个TimeServer提供获取时间的服务,下面源码中用到的关于时间服务的三个类(包括TimeServer、TimeClient和Time Client Exception)会在另一篇博客《Java NIO时间服务》给源码.

 

我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition()方法我这里暂时没实现。这个Lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是

最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,Jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)。

 

 

Java代码  收藏代码

package cc.lixiaohui.lock;  
  
import java.util.concurrent.TimeUnit;  
  
public interface Lock extends Releasable{  
  
    /** 
     * 阻塞性的获取锁, 不响应中断 
     */  
    void lock();  
      
    /** 
     * 阻塞性的获取锁, 响应中断 
     *  
     * @throws InterruptedException 
     */  
    void lockInterruptibly() throws InterruptedException;  
      
    /** 
     * 尝试获取锁, 获取不到立即返回, 不阻塞 
     */  
    boolean tryLock();  
      
    /** 
     * 超时自动返回的阻塞性的获取锁, 不响应中断 
     *  
     * @param time 
     * @param unit 
     * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁 
     *          
     */  
    boolean tryLock(long time, TimeUnit unit);  
      
    /** 
     * 超时自动返回的阻塞性的获取锁, 响应中断 
     *  
     * @param time 
     * @param unit 
     * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁 
     * @throws InterruptedException 在尝试获取锁的当前线程被中断 
     */  
    boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;  
      
    /** 
     * 释放锁 
     */  
    void unlock();  
      
}

Releasable.java :

Java代码  收藏代码

package cc.lixiaohui.lock;  
  
/** 
 * 代表持有资源的对象, 例如 
 * <ul> 
 * <li> 基于jedis的锁自然持有与redis server的连接 </li> 
 * <li> 基于时间统一的的锁自然持有与time server的连接</li> 
 * </ul> 
 * 因此锁应该实现该接口, 并在{@link Releasable#resease() release} 方法中释放相关的连接 
 *  
 * @author lixiaohui 
 * 
 */  
public interface Releasable {  
      
    /** 
     * 释放持有的所有资源 
     */  
    void release();  
      
}

看Lock的抽象实现:

 

Java代码  收藏代码

package cc.lixiaohui.lock;  
  
import java.util.concurrent.TimeUnit;  
  
/** 
 * 锁的骨架实现, 真正的获取锁的步骤由子类去实现. 
 *  
 * @author lixiaohui 
 * 
 */  
public abstract class AbstractLock implements Lock {  
  
    /** 
     * <pre> 
     * 这里需不需要保证可见性值得讨论, 因为是分布式的锁,  
     * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性  
     * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了. 
     * </pre> 
     */  
    protected volatile boolean locked;  
  
    /** 
     * 当前jvm内持有该锁的线程(if have one) 
     */  
    private Thread exclusiveOwnerThread;  
  
    public void lock() {  
        try {  
            lock(false, 0, null, false);  
        } catch (InterruptedException e) {  
            // TODO ignore  
        }  
    }  
  
    public void lockInterruptibly() throws InterruptedException {  
        lock(false, 0, null, true);  
    }  
  
    public boolean tryLock(long time, TimeUnit unit) {  
        try {  
            return lock(true, time, unit, false);  
        } catch (InterruptedException e) {  
            // TODO ignore  
        }  
        return false;  
    }  
  
    public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {  
        return lock(true, time, unit, true);  
    }  
  
    public void unlock() {  
        // TODO 检查当前线程是否持有锁  
        if (Thread.currentThread() != getExclusiveOwnerThread()) {  
            throw new IllegalMonitorStateException("current thread does not hold the lock");  
        }  
          
        unlock0();  
        setExclusiveOwnerThread(null);  
    }  
  
    protected void setExclusiveOwnerThread(Thread thread) {  
        exclusiveOwnerThread = thread;  
    }  
  
    protected final Thread getExclusiveOwnerThread() {  
        return exclusiveOwnerThread;  
    }  
  
    protected abstract void unlock0();  
      
    /** 
     * 阻塞式获取锁的实现 
     *  
     * @param useTimeout  
     * @param time 
     * @param unit 
     * @param interrupt 是否响应中断 
     * @return 
     * @throws InterruptedException 
     */  
    protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;  
  
}
 基于Redis的最终实现(not reentrant),关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,
 大家可以只看这两个方法然后完全自己写一个:
 
Java代码
package cc.lixiaohui.lock;  
  
import java.io.IOException;  
import java.net.SocketAddress;  
import java.util.concurrent.TimeUnit;  
  
import redis.clients.jedis.Jedis;  
import cc.lixiaohui.lock.time.nio.client.TimeClient;  
  
/** 
 * <pre> 
 * 基于Redis的SETNX操作实现的分布式锁 
 *  
 * 获取锁时最好用lock(long time, TimeUnit unit), 以免网路问题而导致线程一直阻塞 
 *  
 * <a href="http://redis.io/commands/setnx">SETNX操作参考资料</a> 
 * </pre> 
 *  
 * @author lixiaohui 
 * 
 */  
public class RedisBasedDistributedLock extends AbstractLock {  
      
    private Jedis jedis;  
      
    private TimeClient timeClient;  
      
    // 锁的名字  
    protected String lockKey;  
      
    // 锁的有效时长(毫秒)  
    protected long lockExpires;  
      
    public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires, SocketAddress timeServerAddr) throws IOException {  
        this.jedis = jedis;  
        this.lockKey = lockKey;  
        this.lockExpires = lockExpires;  
        timeClient = new TimeClient(timeServerAddr);  
    }  
      
    // 阻塞式获取锁的实现  
    protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{  
        if (interrupt) {  
            checkInterruption();  
        }  
          
        // 超时控制 的时间可以从本地获取, 因为这个和锁超时没有关系, 只是一段时间区间的控制  
        long start = localTimeMillis();  
        long timeout = unit.toMillis(time); // if !useTimeout, then it's useless  
          
        while (useTimeout ? isTimeout(start, timeout) : true) {  
            if (interrupt) {  
                checkInterruption();  
            }  
              
            long lockExpireTime = serverTimeMillis() + lockExpires + 1;//锁超时时间  
            String stringOfLockExpireTime = String.valueOf(lockExpireTime);  
              
            if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁  
                // TODO 成功获取到锁, 设置相关标识  
                locked = true;  
                setExclusiveOwnerThread(Thread.currentThread());  
                return true;  
            }  
              
            String value = jedis.get(lockKey);  
            if (value != null && isTimeExpired(value)) { // lock is expired  
                // 假设多个线程(非单jvm)同时走到这里  
                String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic  
                // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)  
                // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了  
                if (oldValue != null && isTimeExpired(oldValue)) {  
                    // TODO 成功获取到锁, 设置相关标识  
                    locked = true;  
                    setExclusiveOwnerThread(Thread.currentThread());  
                    return true;  
                }  
            } else {   
                // TODO lock is not expired, enter next loop retrying  
            }  
        }  
        return false;  
    }  
      
    public boolean tryLock() {  
        long lockExpireTime = serverTimeMillis() + lockExpires + 1;//锁超时时间  
        String stringOfLockExpireTime = String.valueOf(lockExpireTime);  
          
        if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁  
            // TODO 成功获取到锁, 设置相关标识  
            locked = true;  
            setExclusiveOwnerThread(Thread.currentThread());  
            return true;  
        }  
          
        String value = jedis.get(lockKey);  
        if (value != null && isTimeExpired(value)) { // lock is expired  
            // 假设多个线程(非单jvm)同时走到这里  
            String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic  
            // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)  
            // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了  
            if (oldValue != null && isTimeExpired(oldValue)) {  
                // TODO 成功获取到锁, 设置相关标识  
                locked = true;  
                setExclusiveOwnerThread(Thread.currentThread());  
                return true;  
            }  
        } else {   
            // TODO lock is not expired, enter next loop retrying  
        }  
          
        return false;  
    }  
      
    /** 
     * Queries if this lock is held by any thread. 
     *  
     * @return {@code true} if any thread holds this lock and 
     *         {@code false} otherwise 
     */  
    public boolean isLocked() {  
        if (locked) {  
            return true;  
        } else {  
            String value = jedis.get(lockKey);  
            // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,  
            // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断  
            // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就  
            // 不是同步控制, 它只是一种锁状态的报告.  
            return !isTimeExpired(value);  
        }  
    }  
  
    @Override  
    protected void unlock0() {  
        // TODO 判断锁是否过期  
        String value = jedis.get(lockKey);  
        if (!isTimeExpired(value)) {  
            doUnlock();  
        }  
    }  
      
    public void release() {  
        jedis.close();  
        timeClient.close();  
    }  
      
    private void doUnlock() {  
        jedis.del(lockKey);  
    }  
  
    private void checkInterruption() throws InterruptedException {  
        if(Thread.currentThread().isInterrupted()) {  
            throw new InterruptedException();  
        }  
    }  
      
    private boolean isTimeExpired(String value) {  
        // 这里拿服务器的时间来比较  
        return Long.parseLong(value) < serverTimeMillis();  
    }  
      
    private boolean isTimeout(long start, long timeout) {  
        // 这里拿本地的时间来比较  
        return start + timeout > System.currentTimeMillis();  
    }  
      
    private long serverTimeMillis(){  
        return timeClient.currentTimeMillis();  
    }  
      
    private long localTimeMillis() {  
        return System.currentTimeMillis();  
    }  
      
}

如果将来还换一种实现方式(比如memcached,数据库之类的),到时直接继承AbstractLock并实现lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt), unlock0()方法即可(所谓抽象嘛)

 

测试

 

模拟全局ID增长器,设计一个IDGenerator类,该类负责生成全局递增ID,其代码如下:

 

Java代码  收藏代码

package cc.lixiaohui.lock.example;  
  
import java.math.BigInteger;  
import java.util.concurrent.TimeUnit;  
  
import cc.lixiaohui.lock.Lock;  
import cc.lixiaohui.lock.Releasable;  
  
/** 
 * 模拟分布式环境中的ID生成  
 * @author lixiaohui 
 * 
 */  
public class IDGenerator implements Releasable{  
  
    private static BigInteger id = BigInteger.valueOf(0);  
  
    private final Lock lock;  
  
    private static final BigInteger INCREMENT = BigInteger.valueOf(1);  
  
    public IDGenerator(Lock lock) {  
        this.lock = lock;  
    }  
      
    public String getAndIncrement() {  
        if (lock.tryLock(3, TimeUnit.SECONDS)) {  
            try {  
                // TODO 这里获取到锁, 访问临界区资源  
                System.out.println(Thread.currentThread().getName() + " get lock");  
                return getAndIncrement0();  
            } finally {  
                lock.unlock();  
            }  
        }  
        return null;  
        //return getAndIncrement0();  
    }  
      
    public void release() {  
        lock.release();  
    }  
  
    private String getAndIncrement0() {  
        String s = id.toString();  
        id = id.add(INCREMENT);  
        return s;  
    }  
}

 

测试主逻辑:同一个JVM内开两个线程死循环地(循环之间无间隔,有的话测试就没意义了)获取ID(我这里并不是死循环而是跑20s),获取到ID存到同一个Set里面,在存之前先检查该ID在set中是否存在,如果已存在,则让两个线程都停止。如果程序能正常跑完20s,那么说明这个分布式锁还算可以满足要求,如此测试的效果应该和不同JVM(也就是真正的分布式环境中)测试的效果是一样的,下面是测试类的代码:

 

Java代码  收藏代码

package cc.lixiaohui.DistributedLock.DistributedLock;  
  
import java.net.InetSocketAddress;  
import java.net.SocketAddress;  
import java.util.HashSet;  
import java.util.Set;  
  
import org.junit.Test;  
  
import redis.clients.jedis.Jedis;  
import cc.lixiaohui.lock.Lock;  
import cc.lixiaohui.lock.RedisBasedDistributedLockV0_0;  
import cc.lixiaohui.lock.RedisBasedDistributedLock;  
import cc.lixiaohui.lock.example.IDGenerator;  
  
public class IDGeneratorTest {  
      
    private static Set<String> generatedIds = new HashSet<String>();  
      
    private static final String LOCK_KEY = "lock.lock";  
    private static final long LOCK_EXPIRE = 5 * 1000;  
      
    @Test  
    public void testV1_0() throws Exception {  
          
        SocketAddress addr = new InetSocketAddress("localhost", 9999);  
          
        Jedis jedis1 = new Jedis("localhost", 6379);  
        Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE, addr);  
        IDGenerator g1 = new IDGenerator(lock1);  
        IDConsumeTask consume1 = new IDConsumeTask(g1, "consume1");  
          
        Jedis jedis2 = new Jedis("localhost", 6379);  
        Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE, addr);  
        IDGenerator g2 = new IDGenerator(lock2);  
        IDConsumeTask consume2 = new IDConsumeTask(g2, "consume2");  
          
        Thread t1 = new Thread(consume1);  
        Thread t2 = new Thread(consume2);  
        t1.start();  
        t2.start();  
          
        Thread.sleep(20 * 1000); //让两个线程跑20秒  
          
        IDConsumeTask.stopAll();  
          
        t1.join();  
        t2.join();  
    }  
      
    static String time() {  
        return String.valueOf(System.currentTimeMillis() / 1000);  
    }  
      
    static class IDConsumeTask implements Runnable {  
  
        private IDGenerator idGenerator;  
          
        private String name;  
          
        private static volatile boolean stop;  
          
        public IDConsumeTask(IDGenerator idGenerator, String name) {  
            this.idGenerator = idGenerator;  
            this.name = name;  
        }  
          
        public static void stopAll() {  
            stop = true;  
        }  
          
        public void run() {  
            System.out.println(time() + ": consume " + name + " start ");  
            while (!stop) {  
                String id = idGenerator.getAndIncrement();  
                if (id != null) {  
                    if(generatedIds.contains(id)) {  
                        System.out.println(time() + ": duplicate id generated, id = " + id);  
                        stop = true;  
                        continue;  
                    }   
                      
                    generatedIds.add(id);  
                    System.out.println(time() + ": consume " + name + " add id = " + id);  
                }  
            }  
            // 释放资源  
            idGenerator.release();  
            System.out.println(time() + ": consume " + name + " done ");  
        }  
          
    }  
      
}

说明一点,我这里停止两个线程的方式并不是很好,我是为了方便才这么做的,因为只是测试,最好不要这么做。

 

测试结果

跑20s打印的东西太多,前面打印的被clear了,只有差不多跑完的时候才有,下面截图。说明了这个锁能正常工作:

 

a69f80f0-2f86-3b56-be37-730995e6613d.png

当IDGererator没有加锁(即IDGererator的getAndIncrement方法内部获取id时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:

 

这个1秒都不到:

AAAAAAAAAAAAAAAAA.png

当IDGererator没有加锁(即IDGererator的getAndIncrement方法内部获取id时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:

 

BBBBBBBBBBBBBB.png

 

改进

Java 实现基于Redis的分布式可重入锁 中改进使其可重入

转载自:http://lixiaohui.iteye.com/blog/2320554

Home