本文介绍: 由于synchronized跟ReetrantLock是JVM级别的锁,在分布式情况下失效,这时候我们通常会选择redisson基于redis封装好的分布式锁。下面我们一起来分析以下redisson的源码。

由于synchronized跟ReetrantLock是JVM级别的锁,在分布式情况下失效,这时候我们通常会选择redisson基于redis封装好的分布式锁。下面我们一起来分析以下redisson的源码。

使用方式

在这里插入图片描述

流程

在这里插入图片描述

getLock源码

在这里插入图片描述

在这里插入图片描述

  • 给命令执行器赋值
  • 给看门狗时间赋值,默认30秒
  • 给发布订阅器赋值

在这里插入图片描述

-生成UUID

tryLock源码

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        //获取当前线程ID
        long threadId = Thread.currentThread().getId();
        //尝试获取锁,成功返回null,失败返回剩余等待时间
        Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
        if (ttl == null) {
        //获取锁成功,返回true
            return true;
        } else {
        	//剩余等待时间
            time -= System.currentTimeMillis() - current;
            if (time <= 0L) {
            //获取锁超时失败
                this.acquireFailed(waitTime, unit, threadId);
                return false;
            } else {
                current = System.currentTimeMillis();
                //当前线程订阅频道redisson_lock__channel加锁的名字
    }
                CompletableFuture subscribeFuture = this.subscribe(threadId);

                try {
                //等待
                    subscribeFuture.get(time, TimeUnit.MILLISECONDS);
                } catch (TimeoutException | ExecutionException var20) {
                //超时取消订阅
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.whenComplete((res, ex) -> {
                            if (ex == null) {
                                this.unsubscribe(res, threadId);
                            }

                        });
                    }
					//获取失败
                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                try {
                //进入到这里说明订阅到消息
                    time -= System.currentTimeMillis() - current;
                    if (time <= 0L) {
                    //超时
                        this.acquireFailed(waitTime, unit, threadId);
                        boolean var22 = false;
                        return var22;
                    } else {
                        boolean var16;
                        //循环获取锁直到超时或成功
                        do {
                            long currentTime = System.currentTimeMillis();
                            //尝试获取锁
                            ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                            if (ttl == null) {
                                var16 = true;
                                return var16;
                            }

                            time -= System.currentTimeMillis() - currentTime;
                            if (time <= 0L) {
                                this.acquireFailed(waitTime, unit, threadId);
                                var16 = false;
                                return var16;
                            }

                            currentTime = System.currentTimeMillis();
                            if (ttl >= 0L && ttl < time) {
                                ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                            } else {
                                ((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture)).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                            }

                            time -= System.currentTimeMillis() - currentTime;
                        } while(time > 0L);

                        this.acquireFailed(waitTime, unit, threadId);
                        var16 = false;
                        return var16;
                    }
                } finally {
                //取消订阅
                    this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);
                }
            }
        }
    }

tryAcquire方法

在这里插入图片描述
在这里插入图片描述

-执行lua脚本尝试获取锁,成功获取锁或者锁重入返回nil,失败返回锁的剩余时间

 <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, command,
         "if (redis.call('exists', KEYS[1]) == 0) 
         then redis.call('hincrby', KEYS[1], ARGV[2], 1); 
         redis.call('pexpire', KEYS[1], ARGV[1]); 
         return nil; 
         end; 
         if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
         then redis.call('hincrby', KEYS[1], ARGV[2], 1); 
         redis.call('pexpire', KEYS[1], ARGV[1]); 
         return nil;
          end; 
          return redis.call('pttl', KEYS[1]);"
          , Collections.singletonList(this.getRawName()), new Object[]{unit.toMillis(leaseTime), this.getLockName(threadId)});
    }
  • 执行lua脚本,它首先检查锁是否存在,如果不存在则创建锁并设置过期时间;如果锁已经存在,则尝试获取锁并将锁的计数器加一;最后返回锁的剩余生存时间。
  • 看门狗

在这里插入图片描述
在这里插入图片描述

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, 
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) 
        then 
        redis.call('pexpire', KEYS[1], ARGV[1]); 
        return 1; 
        end; 
        return 0;"
        , 
        Collections.singletonList(this.getRawName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }
  • 当前线程持有锁,刷新锁的有效时间,递归开启延迟任务继续刷新锁的有效时间即看门狗
  • 当前线程不持有锁时,lua脚本返回0,取消看门狗

释放锁

在这里插入图片描述
在这里插入图片描述

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return this.evalWriteAsync(this.getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "
        if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) 
        then 
        return nil;
        end; 
        local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
        if (counter > 0) 
        then 
        redis.call('pexpire', KEYS[1], ARGV[2]); 
        return 0; 
        else redis.call('del', KEYS[1]); 
        redis.call('publish', KEYS[2], ARGV[1]); 
        return 1; 
        end; 
        return nil;"
        , 
        Arrays.asList(this.getRawName(), this.getChannelName()), new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
    }
  • lua脚本减少锁的重入次数,如果为0,删除锁并发布消息到频道

原文地址:https://blog.csdn.net/weixin_53067547/article/details/136008620

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_67809.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注