日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

一、什么是分布式鎖

不同的進(jìn)程需要以互斥的方式來訪問共享資源,這里實(shí)現(xiàn)互斥就是分布式鎖。

簡(jiǎn)單來說就是:同一時(shí)間只有一個(gè)客戶端對(duì)共享資源操作。舉個(gè)實(shí)際例子,搶購(gòu)茅臺(tái),如果不加鎖就會(huì)發(fā)生超賣的事故。

Redis 分布式鎖詳解

 

二、實(shí)現(xiàn)分布式鎖需要注意的點(diǎn)

  1. 互斥性:在任何時(shí)刻,只有一個(gè)客戶端獲得鎖。
  2. 無死鎖:任何時(shí)候都能獲取鎖,即使客戶端崩潰或者或被分區(qū)。
  3. 正確性:解鈴還須系鈴人,客戶端 A 加的鎖只能由客戶端 A 解鎖,其他客戶端不能解鎖。
  4. 容錯(cuò):只要大部分 redis 節(jié)點(diǎn)處于運(yùn)行狀態(tài),客戶端就能夠獲取和釋放鎖。

三、Redis 分布式鎖原理

Redis 加鎖主要是使用 set (
https://redis.io/commands/set ) 命令操作:

SET key value [EX seconds|PX milliseconds|KEEPTTL][NX|XX] [GET]

  • EX -- 設(shè)置指定的過期時(shí)間,以秒為單位。
  • PX -- 設(shè)置指定的過期時(shí)間,以毫秒為單位。
  • NX -- 僅當(dāng)該鍵不存在的時(shí)才會(huì)設(shè)置該鍵。
  • XX -- 僅當(dāng)該鍵存在時(shí)才會(huì)設(shè)置該鍵。

加鎖命令: SET lock_key lock_value PX 10000 NX

只有當(dāng) lock_key 不存在時(shí)才會(huì)設(shè)置 lock_key 和 lock_value,超時(shí)時(shí)間 10000 毫秒,設(shè)置成功返回 OK:

Redis 分布式鎖詳解

 

當(dāng) lock_key 存在時(shí)返回 nil:

Redis 分布式鎖詳解

 

Redis 釋放鎖使用命令: DEL key (
https://redis.io/commands/del )

解鎖命令: DEL lock_key 。

Redis 在 2.6.12 之后的版本才加入 [EX seconds|PX milliseconds] [NX|XX] 這些參數(shù)

Redis 分布式鎖詳解

 

在此之前使用 SETNX (
https://redis.io/commands/setnx ) SETNX 是 “ SET if N ot e X ists” 的縮寫。

SETNX 返回 1 說明設(shè)置成功, 返回 0 說明設(shè)置失敗。

SETNX 和 EXPIRE 操作之間不是原子性的,如果 SETNX 執(zhí)行成功之后, 沒有執(zhí)行 EXPIRE 命令,就可能會(huì)發(fā)生死鎖。

Redis 官網(wǎng)聲明 SETNX 在將來的版本中可能會(huì)被棄用,因?yàn)?nbsp;SETNX 實(shí)現(xiàn)的功能 set 都能實(shí)現(xiàn)。

Redis 分布式鎖詳解

 

四、Redis 實(shí)現(xiàn)分布式鎖注意的點(diǎn)及解決方案

  1. 防死鎖

設(shè)置鎖和設(shè)置鎖的超時(shí)時(shí)間要保持原子性,這點(diǎn)很容易做到 使用 SET lock_key lock_value PX 10000 NX 命令即可, 不要使用 SETNX lock_key lock_value , EXPIRE lock_key 10 這些命令,因?yàn)樗麄冎g不是原子性的,有發(fā)生死鎖的風(fēng)險(xiǎn)。

  1. 合理設(shè)置鎖超時(shí)時(shí)間

鎖的超時(shí)時(shí)間要大于程序執(zhí)行的時(shí)間,否則多個(gè)客戶端可能同時(shí)獲取鎖。充分預(yù)估使用鎖的業(yè)務(wù)代碼執(zhí)行時(shí)間,該時(shí)間不宜過長(zhǎng)也不宜過短,過短,可能使鎖發(fā)生錯(cuò)誤;過長(zhǎng),客戶端異常時(shí)可能會(huì)影響執(zhí)行效率。

  1. 釋放鎖要及時(shí)

客戶端使用完共享資源之后要及時(shí)的釋放鎖,即使在程序發(fā)生異常,JAVA 中一般都是在 finally 里釋放鎖。

  1. 只能釋放自己加的鎖

在釋放鎖的時(shí)要確保這個(gè)鎖是自己的,不能將其他鎖釋放掉,這樣可能導(dǎo)致多個(gè)客戶端同時(shí)獲取鎖。可以通過判斷 lock_value 的值是否相等來判斷是否是自己加的鎖,lock_value 的值可以使用 UUID 或者任意確定唯一的值。

  1. 釋放鎖要保證原子性

客戶端在釋放鎖時(shí)分兩個(gè)步驟,一要比較鎖的值是否相等,二要?jiǎng)h除鎖( DEL key ),這兩個(gè)步驟要保證原子性,否則的話可能導(dǎo)致將其他鎖釋放掉,畫個(gè)圖解釋下:

Redis 分布式鎖詳解

 

  1. 客戶端 A 設(shè)置 lock_order 鎖成功,鎖值為 123uD,超時(shí)間為 10000ms。
  2. 客戶端 A 業(yè)務(wù)代碼執(zhí)行完成,釋放鎖前需要獲取 lock_order 鎖的值。
  3. 客戶端 A 判斷鎖值是否是 123uD,執(zhí)行緩慢。
  4. 客戶端 A 的鎖超時(shí)時(shí)間已到,Redis 自動(dòng)移除了鎖。
  5. 此時(shí)客戶端 B 設(shè)置鎖,lock_order 鎖不存在,所以加鎖成功。
  6. 客戶端 A 判斷鎖值相等,執(zhí)行 del 釋放鎖,此時(shí)客戶端 A 釋放的鎖是客戶端 B 的而不是自己的,鎖出現(xiàn)錯(cuò)誤。

這也好解決,Redis 提供了 EVAL (
https://redis.io/commands/eval ) 命令去解析 Lua 腳本,可以發(fā)一段 Lua 腳本給 Redis 執(zhí)行:

if redis.call("get",KEYS[1]) == ARGV[1] -- 判斷鎖的值是否相等。 KEYS[1], ARGV[1],是指?jìng)魅氲膮?shù),以上面為例,KEYS[1] 指的是 lock_order,ARGV[1] 指的是 123uD, 
then
    return redis.call("del",KEYS[1])    -- 刪除這個(gè) key,返回刪除 key 的個(gè)數(shù)
else
    return 0                            -- 鎖值不相等返回 0
end

復(fù)制代碼

這樣就可以保證原子執(zhí)行了。

五、基于Set命令實(shí)現(xiàn) Redis 分布式鎖

基于 Redisson 客戶端實(shí)現(xiàn) Redis 分布式鎖:

/**
 * 加鎖利用 set(key, value, "PX", "NX") 函數(shù)實(shí)現(xiàn)
 * 解鎖利用 Lua 腳本實(shí)現(xiàn)
 * <p>
 * Created by jie.li on 2021/1/4 7:50 下午
 */
@Component
public class RedisLock1 {


    @Resource
    private RedissonClient redissonClient;


    /**
     * 嘗試加鎖
     *
     * @param name  lock name
     * @param value lock value
     * @return true 加鎖成功, false 加鎖失敗
     */
    public boolean tryLock(String name, String value) {
        RBucket<Object> bucket = redissonClient.getBucket(name);
        // 執(zhí)行的是 set(key, value, "PX", "NX") 命令
        return bucket.trySet(value, 10000, TimeUnit.MILLISECONDS);
    }


    /**
     * 解鎖
     *
     * @param name  lock name
     * @param value lock value
     */
    public void unLock(String name, String value) {
        redissonClient.getScript().eval(RScript.Mode.READ_WRITE, DEL_LOCK_SCRIPT, RScript.ReturnType.INTEGER, Collections.singletonList(name), value);
    }


    // 解鎖腳本
    private static final String DEL_LOCK_SCRIPT =
            "if redis.call("get",KEYS[1]) == ARGV[1] then" +      // 如果 KEYS[1] 對(duì)應(yīng)的 Value 值等于 ARGV[1]
                    " return redis.call("del",KEYS[1])" +         // 刪除 KEYS[1]
                    " else" +                                       // 否則
                    " return 0" +                                   // 返回 0
                    " end;";
}

復(fù)制代碼

測(cè)試代碼:

/**
 * 測(cè)試手動(dòng)加鎖解鎖
 * <p>
 * Created by jie.li on 2021/1/7 2:54 下午
 */
@Service
public class RedisLockTestService {


    @Resource
    private RedisLock1 redisLock1;


    private int i = 50;


    /**
     * 測(cè)試手動(dòng)實(shí)現(xiàn) redis 分布式鎖
     *
     * @return int
     */
    public int biz() {
        String lockName = "redis:lock:1";
        String lockValue = UUID.randomUUID().toString();
        try {
            boolean b = redisLock1.tryLock(lockName, lockValue);
            if (b) {
                if (i > 0) {
                    i--;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            redisLock1.unLock(lockName, lockValue);
        }
        return i;
    }
}

復(fù)制代碼

@Test
public void testBiz() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(200);
    for (int i = 0; i < 200; i++) {
        new Thread(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int i1 = redisLockTestService.biz();
            System.out.println(Thread.currentThread().getName() + " -> " + i1);
        }, "Thread" + i).start();


        countDownLatch.countDown();
    }


    TimeUnit.SECONDS.sleep(5);
}

復(fù)制代碼

六、Redisson 實(shí)現(xiàn)分布式鎖

1. Redisson 實(shí)現(xiàn)鎖簡(jiǎn)介

Redisson 實(shí)現(xiàn)的分布式鎖相對(duì)于我們自己實(shí)現(xiàn)的鎖更加完善,主要有以下兩點(diǎn):

1、可重入

2、鎖重試

3、鎖自動(dòng)延期(看門狗機(jī)制)

Redisson 鎖的依賴圖:

Redis 分布式鎖詳解

 

Redisson 實(shí)現(xiàn)了很多種類型的鎖,所有的鎖都實(shí)現(xiàn)了 JUC 中的 Lock 接口,并且做了擴(kuò)展( RLock ), 所以使用方法和使用 ReentrantLock 差不多。這里我們只針對(duì) RedissonLock 進(jìn)行講解。

2. Redisson 源碼解析

嘗試加鎖

// waitTime  等待獲取鎖的時(shí)間
// leaseTime 鎖的有效期
// unit      使用的時(shí)間單位
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 1、嘗試加鎖,如果當(dāng)前有鎖,返回鎖的剩余時(shí)間ttl,否則返回空
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    // 2、加鎖成功,返回true
    if (ttl == null) {
        return true;
    }
    // 剩余的等待時(shí)間 waitTime
    time -= System.currentTimeMillis() - current;
    // 剩余等待時(shí)間已過
    if (time <= 0) {
        // 獲取鎖失敗
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    
    current = System.currentTimeMillis();
    // 3、訂閱鎖釋放事件。利用semaphore(信號(hào)量),訂閱(Redis 發(fā)布訂閱)鎖的釋放事件,
    // 鎖釋放后立即通知等待的線程競(jìng)爭(zhēng)獲取鎖。 
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    // 4、線程阻塞
    // - 返回 false: 阻塞時(shí)間已經(jīng)超過了剩余等待時(shí)間(waitTime),取消訂閱事件,加鎖失敗
    // - 返回 ture:  繼續(xù)嘗試加鎖
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -> {
                if (e == null) {
                    unsubscribe(subscribeFuture, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    }


    try {
        time -= System.currentTimeMillis() - current;
        // 剩余等待時(shí)間已過,加鎖失敗
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        // 5、繼續(xù)以同樣的方式獲加鎖,如果過了最大的等待加鎖時(shí)間,則加鎖失敗,返回false
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }


            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }


            // waiting for message
            currentTime = System.currentTimeMillis();
            // 6、通過信號(hào)量(共享鎖)阻塞,等待釋放鎖消息
            // 鎖剩余時(shí)間小于剩余的waitTime時(shí)間
            if (ttl >= 0 && ttl < time) {
                // 非阻塞的獲取結(jié)果,獲得信號(hào)量,在給定的時(shí)間內(nèi)從信號(hào)量獲取一個(gè)許可。
                subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            // 7、剩余的waitTime
            time -= System.currentTimeMillis() - currentTime;
            // 加鎖最大等待時(shí)間已過,加鎖失敗,返回false
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 取消訂閱事件
        unsubscribe(subscribeFuture, threadId);
    }
}

復(fù)制代碼

tryLock 方法參數(shù)說明:

嘗試加鎖方法 tryLock ,兩個(gè)重要的入?yún)?waitTime、leaseTime:

  • waitTime: 嘗試加鎖的最大時(shí)間,如果在這個(gè)時(shí)間內(nèi)一直沒有加鎖成功,則返回 false。
  • leaseTime: 鎖的有效期,如果客戶端(進(jìn)程)在這個(gè)時(shí)間內(nèi)沒有釋放鎖,則 Redis 主動(dòng)釋放,當(dāng)然 Redisson 看門狗的機(jī)制會(huì)將這個(gè)時(shí)間延期,后面會(huì)說到。

流程總結(jié):

  1. 嘗試加鎖 tryAcquire ,如果加鎖成功則返回 null, 如果鎖被占用,則返回鎖的剩余時(shí)間 ttl。
  2. 如果加鎖成功返回 true,否在判斷 waitTime 是否過期,過期則加鎖失敗返回 false。
  3. 基于信號(hào)量,通過 Redis 的發(fā)布訂閱,訂閱鎖的釋放事件,一旦鎖釋放會(huì)立即通知等待的線程去競(jìng)爭(zhēng)鎖。
  4. 線程阻塞剩余 waitTime 時(shí)間,來等待鎖釋放的通知,如果阻塞時(shí)間超過了剩余 waitTime 時(shí)間,則取消任務(wù),取消任務(wù)成功再取消訂閱信息,加鎖失敗返回 false;否則在剩余 waitTime 時(shí)間內(nèi)等到了鎖釋放通知,則進(jìn)入循環(huán)加鎖階段。
  5. 循環(huán)中繼續(xù)以同樣的方式加鎖,如果在剩余 waitTime 內(nèi)加鎖成功返回 true,否在加鎖失敗返回 false。
  6. 如果在剩余 waitTime 時(shí)間內(nèi),鎖還是被其他的客戶端(進(jìn)程)持有,阻塞指定時(shí)間(持有鎖的剩余過期時(shí)間和剩余 waitTime 時(shí)間)等待鎖的釋放消息。
  7. 具體實(shí)現(xiàn):利用信號(hào)量(semaphore)阻塞當(dāng)前線程獲取許可,如果有可用許可則繼續(xù)嘗試加鎖,如果沒有可用許可則阻塞給定的時(shí)間,直至其他線程釋放鎖,調(diào)用 release() 方法增加許可,或者其它某些線程中斷當(dāng)前線程,或者已超出指定的等待時(shí)間。
  8. 如果剩余 waitTime 過期,加鎖失敗返回 false。

加鎖

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
      internalLockLeaseTime = unit.toMillis(leaseTime);


      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +  // 1、如果 Redis 中不存在這個(gè) key
                "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // 2、設(shè)置 key 和 field, 并將 value 的值設(shè)置為 1
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +  // 3、設(shè)置 key 的過期時(shí)間
                "return nil; " +  // 4、返回 null
            "end; " +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  // 5、如果 Redis 中存在對(duì)應(yīng)的 key 和 field 
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  // 6、則將對(duì)應(yīng)的 key 和 field 對(duì)應(yīng)的 value 自增 1
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +  // 7、設(shè)置 key 的過期時(shí)間
                "return nil; " +  // 8、返回 null
            "end; " +
            "return redis.call('pttl', KEYS[1]);",  // 9、返回剩余生存時(shí)間, 單位毫秒
              // 以下這三個(gè)參數(shù)分別對(duì)應(yīng) Lua 腳本中的 KEYS[1], ARGV[1], ARGV[2]
              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

復(fù)制代碼

Redisson 中實(shí)際加鎖的代碼,流程總結(jié):

  1. 如果 Redis 中不存在 key。
  2. 則使用 hset 這個(gè)命令設(shè)置 key 和 field,并將 hash 的 value 設(shè)置為 1,這里使用 Redis 中的 hash 數(shù)據(jù)結(jié)構(gòu), value 的值用于支持可重入鎖,記錄加鎖次數(shù)。
  3. 設(shè)置 key 的過期時(shí)間。
  4. 加鎖成功返回 null。
  5. 如果 Redis 中存在對(duì)應(yīng)的 key 和 field。
  6. 將對(duì)應(yīng)的 key 和 field 對(duì)應(yīng)的 value 值自增 1,記錄重入鎖的次數(shù)。
  7. 設(shè)置 key 的過期時(shí)間。
  8. 加鎖成功返回 null。
  9. 加鎖失敗,返回 key 的剩余生存時(shí)間(單位毫秒)。

鎖自動(dòng)續(xù)期(Watch Dog 機(jī)制)

在不指定鎖超時(shí)時(shí)間(leaseTime)的情況下,Redisson 分布式鎖會(huì)自動(dòng)給鎖續(xù)期,也就是所謂的看門狗機(jī)制。

鎖自動(dòng)續(xù)期代碼解析:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 如果指定了鎖的有效期,則直接返回加鎖結(jié)果,不會(huì)走后面的 Watch Dog 機(jī)制
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 實(shí)際加鎖
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // 加鎖執(zhí)行完成后
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        // 加鎖執(zhí)行有異常,直接返回
        if (e != null) {
            return;
        }


        // lock acquired
        // 獲取到鎖
        if (ttlRemaining == null) {
            // 自動(dòng)續(xù)期(watch dog)
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

復(fù)制代碼

private void scheduleExpirationRenewal(long threadId) {
    // ExpirationEntry 維護(hù)鎖的線程重入計(jì)數(shù)器和續(xù)期任務(wù)
    ExpirationEntry entry = new ExpirationEntry();
    // 將 entry 放入 ConcurrentHashMap
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
      // 鎖重入,當(dāng)前線程計(jì)數(shù)器+1
      oldEntry.addThreadId(threadId);
    } else {
      // 第一次,當(dāng)前線程計(jì)數(shù)器+1
      entry.addThreadId(threadId);
      // 第一次觸發(fā)鎖續(xù)期
      renewExpiration();
    }
}

復(fù)制代碼

 private void renewExpiration() {
     // 在 ConcurrentHashMap 中拿到 ExpirationEntry 對(duì)象
     ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
     if (ee == null) {
       return;
     }
     // 新建一個(gè)定時(shí)任務(wù),自動(dòng)續(xù)期的主要實(shí)現(xiàn)
     Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
       @Override
       public void run(Timeout timeout) throws Exception {
         ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
         if (ent == null) {
           return;
         }
         // 獲取第一個(gè)線程Id
         Long threadId = ent.getFirstThreadId();
         if (threadId == null) {
           return;
         }
         // 異步續(xù)期
         RFuture<Boolean> future = renewExpirationAsync(threadId);
         future.onComplete((res, e) -> {
           if (e != null) {
             // 續(xù)期異常,打印錯(cuò)誤日志,并且清除Map,不再執(zhí)行續(xù)期。
             log.error("Can't update lock " + getName() + " expiration", e);
             EXPIRATION_RENEWAL_MAP.remove(getEntryName());
             return;
           }
           // 續(xù)期成功后,遞歸調(diào)用,繼續(xù)調(diào)用達(dá)到持續(xù)續(xù)期目的
           if (res) {
             // reschedule itself
             renewExpiration();
           }
         });
       }
     // 延遲執(zhí)行時(shí)間為 internalLockLeaseTime / 3,internalLockLeaseTime 默認(rèn)時(shí)間是 30s,也可以自定義指定。
     }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);


     ee.setTimeout(task);
 }

復(fù)制代碼

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
               "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  // 如果存在指定的 key 和 filed 
               "redis.call('pexpire', KEYS[1], ARGV[1]); " +                // 續(xù)期
               "return 1; " +                                               // 返回續(xù)期成功
               "end; " +
               "return 0;",                                                 // 返回續(xù)期失敗
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

復(fù)制代碼

// 看門狗超時(shí)時(shí)間默為 30s, 自定義的話可以修改 lockWatchdogTimeout 配置
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();


private long lockWatchdogTimeout = 30 * 1000;

復(fù)制代碼

鎖自動(dòng)續(xù)期總結(jié):

  1. 在沒有指定鎖超時(shí)時(shí)間(leaseTime)的情況下,加鎖成功后就會(huì)執(zhí)行自動(dòng)續(xù)期。
  2. 如果當(dāng)前線程持有的是重入鎖,則對(duì)鎖重入次數(shù)+1,如果是首次加鎖,除了鎖次數(shù)+1 還需要執(zhí)行鎖續(xù)期。這里需要清楚是只有首次加鎖才會(huì)續(xù)期,重入鎖不會(huì)執(zhí)行續(xù)期操作。將鎖對(duì)應(yīng)的線程 Id 及重入次數(shù)放入對(duì)象 ExpirationEntry 中, ExpirationEntry 對(duì)像使用 LinkedHashMap 維護(hù)了鎖的線程 Id 和重入計(jì)數(shù)器。然后將 ExpirationEntry 對(duì)象放 EXPIRATION_RENEWAL_MAP (ConcurrentHashMap), EXPIRATION_RENEWAL_MAP 中存放著所有需要續(xù)期的鎖。
  3. 新建一個(gè)延遲任務(wù),10s(默認(rèn))之后執(zhí)行,在 EXPIRATION_RENEWAL_MAP 中取出 ExpirationEntry 對(duì)象,拿到第一個(gè)線程 Id,然后執(zhí)行 Lua 腳本,檢查線程 Id 對(duì)應(yīng)的 key 和 filed 是否存在(鎖),如果存在則重置鎖的超時(shí)時(shí)間為 30s(默認(rèn)),如果不存在則說明已經(jīng)解鎖了不需要續(xù)期。
  4. 續(xù)期成功后,繼續(xù)遞歸調(diào)用步驟 3,保證持續(xù)鎖續(xù)期,續(xù)期失敗則說明鎖已經(jīng)不存在了,停止續(xù)期。

當(dāng)服務(wù)宕機(jī)時(shí),看門狗的線程也就不存在了,此時(shí)也就不會(huì)對(duì)鎖進(jìn)行自動(dòng)續(xù)期,到了 30s 鎖就會(huì)自動(dòng)過期,其他線程就可以獲取鎖了,不會(huì)造成死鎖。

解鎖

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +                  // 如果鎖不存在
                    "return nil;" +                                                      // 解鎖失敗
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +    // 否則將鎖的重入計(jì)數(shù)器-1
                    "if (counter > 0) then " +                                           // 如果重入計(jì)數(shù)器>0
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +                        // 將鎖續(xù)期 30s
                    "return 0; " +                                                       // 返回成功
                    "else " + 
                    "redis.call('del', KEYS[1]); " +                                     // 否則刪除鎖    
                    "redis.call('publish', KEYS[2], ARGV[1]); " +                        // 發(fā)布解鎖消息
                    "return 1; " +                                                       // 返回解鎖成功
                    "end; " +
                    "return nil;",                                                       // 解鎖失敗
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

復(fù)制代碼

流程總結(jié):

  1. 判斷鎖是否存在,如果鎖不存在直接返回 null。
  2. 重入計(jì)數(shù)器減一(因?yàn)橹С种厝腈i的緣故,這里不能直接將鎖刪除)。
  3. 如果重入計(jì)數(shù)器還是大于零,說明線程還是持有鎖的,將鎖續(xù)期 30s,返回成功。
  4. 否則刪除鎖,并且發(fā)送刪除鎖的消息(channelName:redisson_lock__channel:{鎖 key 值}),以通知阻塞隊(duì)列中的線程嘗試加鎖。
  5. 返回解鎖成功。

總結(jié)

  • RedissonLock 實(shí)現(xiàn)了鎖等待(waitTime),鎖重入,鎖自動(dòng)續(xù)期等復(fù)雜功能。
  • RedissonLock 實(shí)現(xiàn)的分布式鎖使用的是 Hash 數(shù)據(jù)結(jié)構(gòu),其中 Hash key 是我們指定鎖的 key 值, filed 是 UUID:threaId,value 是重入鎖次數(shù)。其中 UUID 是 Redisson 客戶端連接管理器實(shí)例初始化生成的 UUID。使用 Hash 數(shù)據(jù)結(jié)構(gòu),是實(shí)現(xiàn)鎖重入的關(guān)鍵。
  • RedissonLock 加鎖,解鎖,看門狗都是用了 Lua 腳本,保證命令執(zhí)行的原子性。
  • RedissonLock 實(shí)現(xiàn)鎖等待時(shí)間(waitTime)不是使用的 while(true) 手段,而是使用的 Redis 發(fā)布訂閱,semaphore(信號(hào)量)實(shí)現(xiàn)的,解決了無效鎖申請(qǐng)?jiān)斐傻南到y(tǒng)資源浪費(fèi)問題。
  • 具體實(shí)現(xiàn)是使用 semaphore 進(jìn)行帶期限的阻塞線程,當(dāng)鎖釋放時(shí)會(huì)發(fā)布鎖釋放的消息,收到解鎖消息后調(diào)用 release() 方法,此時(shí)被 semaphore 阻塞的等待隊(duì)列中的一個(gè)線程就可以嘗試獲取鎖了,如果在指定期限內(nèi)未獲得鎖,則獲取鎖失敗。
  • 只有未設(shè)置鎖超時(shí)時(shí)間(leaseTime),才能使用 Redisson 看門狗機(jī)制。

七、Redis 高可用架構(gòu)下的分布式鎖問題

上面講 Redisson 實(shí)現(xiàn)的分布式鎖,在單機(jī)模式下已經(jīng)趨近完美了。

但是單點(diǎn)的話故障的話,那就芭比 Q 了,所以我們第一點(diǎn)想到的是部署高可用集群。

目前 Redis 高可用架構(gòu)主要有主從模式,哨兵模式,集群模式,在這三種模式下使用 Redis 分布式鎖存在一個(gè)弊端,可能會(huì)導(dǎo)致多個(gè)客戶端同時(shí)加鎖成功。

Redis 分布式鎖詳解

 

客戶端 A 加鎖成功,由于 Reids 主從同步數(shù)據(jù)是異步執(zhí)行的,LockA 鎖還沒來的及同步到 Slave,此時(shí) Master 節(jié)點(diǎn)宕機(jī)了。

Slave 節(jié)點(diǎn)提升為 Master,客戶端 B 來加鎖,發(fā)現(xiàn)沒有其他客戶端占用鎖,LockB 加鎖成功。

這時(shí)就導(dǎo)致了兩個(gè)客戶端同時(shí)獲取了鎖。

所以,如果使用 Redis 分布式鎖,應(yīng)盡量避免主從、哨兵或集群模式。

八、紅鎖(Redlock)

1. Redlock 概念

RedLock 是 Redis 作者提出的一個(gè)算法。

Redlock 官網(wǎng)介紹

在該算法的分布式版本中,我們假設(shè)有 N 個(gè) Redis masters。這些節(jié)點(diǎn)是完全獨(dú)立的,所以我們不使用復(fù)制或任何其他隱式協(xié)調(diào)系統(tǒng)。我們已經(jīng)描述了如何在單個(gè)實(shí)例中安全地獲取和釋放鎖。我們想當(dāng)然地認(rèn)為,算法將使用這種方法在單個(gè)實(shí)例中獲取和釋放鎖。在我們的示例中,我們?cè)O(shè)置了 N=5,這是一個(gè)合理的值,因此我們需要在不同的計(jì)算機(jī)或虛擬機(jī)上運(yùn)行 5 個(gè) Redis 主機(jī),以確保它們以基本獨(dú)立的方式失敗。

為了獲取鎖,客戶端執(zhí)行以下操作:

  1. 以毫秒為單位獲取當(dāng)前時(shí)間。
  2. 使用相同的 key 和隨機(jī)值在所有 Redis 實(shí)例中順序獲取鎖。當(dāng)在每個(gè)實(shí)例中獲取鎖時(shí),客戶端使用一個(gè)超時(shí),該超時(shí)與鎖自動(dòng)釋放的總時(shí)間相比很小,以便獲取它。例如,如果自動(dòng)釋放時(shí)間為 10 秒,則超時(shí)時(shí)間可能在 5-50 毫秒范圍內(nèi)。這可以防止客戶端在嘗試與已關(guān)閉的 Redis 節(jié)點(diǎn)通話時(shí)長(zhǎng)時(shí)間處于阻塞狀態(tài):如果某個(gè)實(shí)例不可用,我們應(yīng)該盡快嘗試與下一個(gè)實(shí)例通話。
  3. 客戶端通過從當(dāng)前時(shí)間中減去在步驟 1 中獲得的時(shí)間戳來計(jì)算獲取鎖所用的時(shí)間。當(dāng)且僅當(dāng)客戶端能夠在大多數(shù)實(shí)例(至少 3 個(gè))中獲取鎖,并且獲取鎖所用的總時(shí)間小于鎖有效時(shí)間,則認(rèn)為已獲取鎖。
  4. 如果獲得了鎖,其有效時(shí)間將被視為初始有效時(shí)間減去經(jīng)過的時(shí)間,如步驟 3 中計(jì)算的。
  5. 如果客戶端由于某種原因(無法鎖定 N/2+1 實(shí)例或有效期為負(fù))未能獲取鎖,它將嘗試解鎖所有實(shí)例(即使是它認(rèn)為無法鎖定的實(shí)例)。

Redis 作者對(duì)紅鎖的介紹非常詳細(xì),點(diǎn)擊這里查看。

簡(jiǎn)單總結(jié)下:

假設(shè)有五個(gè) Redis 實(shí)例,這些實(shí)例之間是完全獨(dú)立的,并且部署在不同的計(jì)算機(jī)上,客戶端嘗試在這幾個(gè)實(shí)例中獲取鎖。

如果客戶端能夠在大多數(shù)實(shí)例(N/2+1,至少三個(gè))中獲取鎖,并且獲取鎖所有的總時(shí)間小于鎖有效時(shí)間,則認(rèn)為獲取鎖成功。

如果加鎖成功,鎖的有效期=初始有效時(shí)間-獲取鎖的總時(shí)間,假如鎖有效期為 10s,獲取鎖共花了 2s,那么鎖的有效期還剩 8s。

無論客戶端獲取鎖成功還是失敗,都需要解鎖所有 Redis 實(shí)例,以免發(fā)生死鎖。

Redis 分布式鎖詳解

 

使用多個(gè)完全獨(dú)立的 Redis 實(shí)例,解決了 Redis 主從異步復(fù)制造成的鎖丟失問題,同時(shí)保障了高可用。

至少 N/2+1 個(gè)實(shí)例加鎖成功,保證鎖的互斥性,防止多個(gè)客戶端同時(shí)獲取到鎖。

2. Redlock 存在問題

表面上看 RedLock 解決 Redis 分布式鎖的痛點(diǎn),但是真的就萬無一失了嗎?

有人就提出了質(zhì)疑,Martin Kleppmann: How to do distributed locking

Martin Kleppmann 在效率和正確性方面質(zhì)疑了紅鎖,他認(rèn)為如果是為了效率使用分布式鎖,沒有必要承擔(dān) Redlock 的成本和復(fù)雜性,最好還是使用一個(gè) Reids 實(shí)例或者主從模式。正確性方面,他認(rèn)為 Redlock 也絕對(duì)保證不了鎖的正確性,文章在網(wǎng)絡(luò)延遲,過程暫停(GC),時(shí)鐘漂移方面給出了論證。

Redis 作者(Salvatore)也反駁了該質(zhì)疑:Is Redlock safe?

建議大家讀下上面兩篇文章。

我個(gè)人認(rèn)為使用 Redlock 要慎重,首先,它的效率比較差,在一些 RT 要求比較高的接口中增加了耗時(shí)風(fēng)險(xiǎn);其次,無法保證絕對(duì)的正確性,可能會(huì)出現(xiàn)多個(gè)客戶端同時(shí)獲取鎖的風(fēng)險(xiǎn)(Martin Kleppmann 在他的文章里有舉證);再次,成本和復(fù)雜性較高。

3. Redisson紅鎖使用

使用示例:

// 在不同Redis實(shí)例上獲取 RLock
RLock rLock1 = redisson1.getLock(key);
RLock rLock2 = redisson2.getLock(key);
RLock rLock3 = redisson3.getLock(key);
// 初始化紅鎖
RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1, rLock2, rLock3);
// 加鎖
redissonRedLock.lock();
// 業(yè)務(wù)邏輯
// 解鎖
redissonRedLock.unlock();

復(fù)制代碼

Redis 分布式鎖詳解

 

Redisson 在新版本中已經(jīng)棄用了 RedissonRedLock,不建議使用。

分享到:
標(biāo)簽:分布式
用戶無頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定