4233永利皇宫_www.4233.com_永利皇宫信誉总站

热门关键词: 4233永利皇宫,www.4233.com,永利皇宫信誉总站

所以考虑可以使用SETNX,常见3种分布式的实现比

日期:2019-09-29编辑作者:4233永利皇宫

原标题:基于Redis实现分布式锁-Redisson使用及源码分析【面试+工作】

在多线程开发中我们使用锁来避免线程争夺共享资源。在分布式系统中,程序在多个节点上运行无法使用单机锁来避免资源竞争,因此我们需要一个锁服务来避免多个节点上的进程争夺资源。

常见3种分布式的实现比较

业务背景:存储请求参数token ,token唯一 ,且新的生成旧的失效

基于Redis实现分布式锁-Redisson使用及源码分析【面试+工作】

Redis数据库基于内存,具有高吞吐量、便于执行原子性操作等特点非常适合开发对一致性要求不高的锁服务。

  • 基于数据库实现分布式锁
  • 基于缓存实现分布式锁(redis,mc)
  • 基于Zookeeper实现分布式锁

思路:因为是多台机器,获取token存入redis,保持唯一,考虑使用redis来加锁,其实就是在redis中存一个key,其他机器发现key有值的话就不进行获取token的请求。

在分布式场景下,有很多种情况都需要实现最终一致性。在设计远程上下文的领域事件的时候,为了保证最终一致性,在通过领域事件进行通讯的方式中,可以共享存储(领域模型和消息的持久化数据源),或者做全局XA事务(两阶段提交,数据源可分开),也可以借助消息中间件(消费者处理需要能幂等)。通过Observer模式来发布领域事件可以提供很好的高并发性能,并且事件存储也能追溯更小粒度的事件数据,使各个应用系统拥有更好的自治性。

本文介绍了简单分布式锁、Redisson分布式锁的实现以及解决单点服务的RedLock分布式锁概念。

Redission

SET操作会覆盖原有值,SETEX虽然可设置key过期时间,但也会覆盖原有值,所以考虑可以使用SETNX

本文主要探讨另外一种实现分布式最终一致性的解决方案——采用分布式锁。基于分布式锁的解决方案,比如zookeeper,redis都是相较于持久化(如利用InnoDB行锁,或事务,或version乐观锁)方案提供了高可用性,并且支持丰富化的使用场景。 本文通过Java版本的redis分布式锁开源框架——Redisson来解析一下实现分布式锁的思路。

Redis是一致性较低的数据库,若对锁服务的一致性要求较高建议使用zookeeper等中间件开发锁服务。

1. 简介

redission为redis官方推荐方式翻译,github地址。redisson-quick-start

SETNX Key value

分布式锁的使用场景

基于单点Redis的分布式锁

Redis实现分布式锁的原理非常简单, 节点在访问共享资源前先查询redis中是否有该资源对应的锁记录, 若不存在锁记录则写入一条锁记录(即获取锁)随后访问共享资源. 若节点查询到redis中已经存在了资源对应的锁记录, 则放弃操作共享资源.

下面给出一个非常简单的分布式锁示例:

import redis.clients.jedis.Jedis;

import java.util.Random;
import java.util.UUID;


public class MyRedisLock {

    private Jedis jedis;

    private String lockKey;

    private String value;

    private static final Integer DEFAULT_TIMEOUT = 30;

    private static final String SUFFIX = ":lock";

    public MyRedisLock(Jedis jedis) {
        this.jedis = jedis;
    }

    public boolean acquire(String key, long time) throws InterruptedException {
        Long outdatedTime = System.currentTimeMillis() + time;
        lockKey = key + SUFFIX;
        while (true) {
            if (System.currentTimeMillis() >= outdatedTime) {
                return false;
            }
            value = UUID.randomUUID().toString(); // 1
            return "OK".equals(jedis.set(lockKey, value, "NX", DEFAULT_TIMEOUT)); // 2
        }
    }

    public boolean check() {
        return value != null && value.equals(jedis.get(lockKey)); // 3
    }

    public boolean release() {
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        return 1L.equals(jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value))); // 3
    }
}

加锁后所有对共享资源的操作都应该先检查当前线程是否仍持有锁。

在分布式锁的实现中有几点需要注意:

  1. 加锁过程:
    1. 锁的过期时间应设置到redis中,保证在加锁客户端故障的情况下锁可以被自动释放
    2. 使用set key value EX seconds NX命令进行加锁,不要使用setnx和expire两个命令加锁。
      若setnx执行成功而expire失败(如执行setnx后客户端崩溃),则可能造成死锁。
    3. 锁记录的值不能使用固定值。 使用固定值可能导致严重错误: 线程A的锁因为超时被释放, 随后线程B成功加锁。 B写入的锁记录与A的锁记录没有区别, 因此A在检查时会误判为自己仍持有锁。
  2. 解锁过程:
    1. 解锁操作使用lua脚本执行get和del两个操作,为了保证两个操作的原子性。若两个操作不具有原子性则可能出现错误时序: 线程A执行get操作判断自己仍持有锁 -> 锁超时释放 -> 线程B成功加锁 -> 线程A删除锁记录(线程A认为删除了自己的锁记录,实际上删除了线程B的锁记录)。

上文只是提供了简单示例,还有一些重要功能没有实现:

  1. 阻塞加锁:可以使用redis的发布订阅功能,获取锁失败的线程订阅锁被释放的消息再次尝试加锁
  2. 无限期锁:应写入有TTL的锁记录,设置定时任务在锁失效前刷新锁过期的时间。这种方式可以避免持有锁的线程崩溃导致的死锁
  3. 可重入锁(持有锁的线程可以再次加锁):示例中持有锁的线程无法对同一个资源再次加锁,即不可重入锁。实现可重入锁需要锁记录由(key:资源标记, value:持有者标记)的键值对结构变为(key:资源标记, field:持有者标记, value:计数器)这样的hash结构。持有锁的线程每次重入锁计数器加1,每次释放锁计数器减1,计数器为0时删除锁记录。

总结来看实现Redis分布式锁有几点需要注意:

  1. 加解锁操作应保证原子性,避免多个线程同时操作出现异常
  2. 应考虑进程崩溃、Redis崩溃、操作成功执行但未收到成功响应等异常状况,避免死锁
  3. 解锁操作必须避免 某个线程释放了不属于自己的锁 的异常

2. 实现分析

将 key 的值设为 value ,当且仅当 key 不存在。

如果是不跨限界上下文的情况,跟本地领域服务相关的数据一致性,尽量还是用事务来保证。但也有些无法用事务或者乐观锁来处理的情况,这些情况大多是对于一个共享型的数据源,有并发写操作的场景,但又不是对于单一领域的操作。

Redisson

这里我们以基于Java的Redisson为例讨论一下成熟的Redis分布式锁的实现。

redisson实现了java.util.concurrent.locks.Lock接口,可以像使用普通锁一样使用redisson:

RLock lock = redisson.getLock("key"); 
lock.lock(); 
try {
    // do sth.
} finally {
    lock.unlock(); 
}

分析一下RLock的实现类org.redisson.RedissonLock:

2.1 解决的问题

 

举个例子,还是用租书来比喻,A和B两个人都来租书,在查看图书的时候,发现自己想要看的书《大设计》库存仅剩一本。书店系统中,书作为一种商品,是在商品系统中,以Item表示出租商品的领域模型,同时每一笔交易都会产生一个订单,Order是在订单系统(交易限界上下文)中的领域模型。这里假设先不考虑跨系统通信的问题,也暂时不考虑支付环节,但是我们需要保证A,B两个人不会都对于《大设计》产生订单就可以,也就是其中一个人是可以成功下单,另外一个人只要提示库存已没即可。此时,书的库存就是一种共享的分布式资源,下订单,减库存就是一个需要保证一致性的写操作。但又因为两个操作不能在同一个本地事务,或者说,不共享持久化的数据源的情况,这时候就可以考虑用分布式锁来实现。本例子中,就需要对于共享资源——书的库存进行加锁,至于锁的key可以结合领域模型的唯一标识,如itemId,以及操作类型(如操作类型是RENT的)设计一个待加锁的资源标识。当然,这里还有一个并发性能的问题,如果是个库存很多的秒杀类型的业务,那么就不能单纯在itemId 加类型加锁,还需要设计排队队列以及合理的调度算法,防止超卖等等,那些就是题外话了。本文只是将这个场景作为一个切入点,具体怎么设计锁,什么场景用还要结合业务。

加锁操作

@Override
public void lock() {
    try {
        lockInterruptibly();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

@Override
public void lockInterruptibly() throws InterruptedException {
    lockInterruptibly(-1, null);
}

再看等待加锁的方法lockInterruptibly:

@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future);

        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }

lockInterruptibly 方法会尝试获取锁,若获取失败则会订阅释放锁的消息。收到锁被释放的通知后再次尝试获取锁,直到成功或者超时。

接下来分析tryAcquire:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId)); // 调用异步获得锁的实现,使用get(future)实现同步
}

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    // 设置了超时时间
    if (leaseTime != -1) {
        // tryLockInnerAsync 加锁成功返回 null, 加锁失败在 Future 中返回锁记录剩余的有效时间
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 未设置超时时间,尝试获得无限期的锁
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
                // 避免对共享资源操作完成前锁就被释放掉,定期刷新锁失效的时间
                // 默认锁失效时间的三分之一即进行刷新
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryAcquireAsync中主要逻辑是无限期锁的实现,Redisson并非设置了永久的锁记录,而是定期刷新锁失效的时间。

这种方式避免了持有锁的进程崩溃无法释放锁导致死锁。

真正实现获取锁逻辑的是tryLockInnerAsync方法:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return commandExecutor.evalWriteAsync(
        getName(),
        LongCodec.INSTANCE, 
        command,
          "if (redis.call('exists', KEYS[1]) == 0) then " + // 资源未被加锁
              "redis.call('hset', KEYS[1], ARGV[2], 1); " + // 写入锁记录, 锁记录是一个hash; key:共享资源名称, field:锁实例名称(Redisson客户端ID:线程ID), value: 1(value是一个计数器,记录当前线程获取该锁的次数,实现可重入锁)
              "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置锁记录过期时间
              "return nil; " +
          "end; " +
          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 若当前线程已经持有该资源的锁
              "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 将锁计数器加1, 
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
              "return nil; " +
          "end; " +
          "return redis.call('pttl', KEYS[1]);", // 资源已被其它线程加锁,加锁失败。获取锁剩余生存时间后返回
        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

上述操作使用eval命令执行lua脚本保证了操作的原子性。

安全和活跃性保证

  • 安全

任何时候只有一个客户端可以获得锁

  • 活跃属性
  1. 死锁自由(即使一个客户端已经拥用了已损坏或已被分割资源的锁,但它也有可能请求其他的锁)

举例:竞态条件

客户端A在主节点获得了一个锁。
主节点挂了,而到从节点的写同步还没完成。
从节点被提升为主节点。
客户端B获得和A相同的锁。注意,锁安全性被破坏了!
  1. 容错(只要大部分Redis节点可用, 客户端就可以获得和释放锁)

  2. 实现思路


若给定的 key 已经存在,则 SETNX 不做任何动作

领域服务概念

unlock

解锁过程相对简单:

@Override
public void unlock() {
    Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                + id + " thread-id: " + Thread.currentThread().getId());
    }
    if (opStatus) {
        cancelExpirationRenewal();
    }
}

unlockInnerAsync方法实现了具体的解锁逻辑:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " + // 资源未被加锁,可能锁已被超时释放
                "redis.call('publish', KEYS[2], ARGV[1]); " + // 发布锁被释放的消息
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 锁的持有者不是自己,抛出异常
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 自己持有锁,因为锁是可重入的将计数器减1
            "if (counter > 0) then " + // 计数器大于0,锁未被完全释放,刷新锁过期时间
                "redis.call('pexpire', KEYS[1], ARGV[2]); " + 
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " + // 锁被完全释放,删除锁记录,发布锁被释放的消息
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

}

3.1 单例演变

SET resource_name my_random_value NX PX 30000

解释:
设置key的值,仅当其不存在时生效(NX选项), 且设置其生存期为30000毫秒(PX选项)。和key关联的value值是"my_random_value"。这个值在所有客户端和所有加锁请求中是必须是唯一的

  • 随机唯一
    1. 可以确保当前锁是该客户端,而防止其他客户端释放、删掉。
    2. 一般组合unix时间戳和客户端ID+随机数(/dev/urandom初始化RC4算法)
  • 锁拥有时长
    1. 一种是指锁的自动释放时长
    2. 另一种是指在另一个客户端获取锁之前某个客户端占用这个锁的时长,这被限制在从锁获取后开始的一段时间窗口内。
  • 简单实现
设置锁的超时时间
SET resource_name my_random_value NX PX 30000

获取锁 == myKey,释放
否则:return 0
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

 

借用《Implementing Domain-driven Design》里面的对于领域服务的定义。领域的某个操作过程或转换过程不是实体或值对象的职责时,应该将操作放在一个单独的接口中,即领域服务,并且要和通用语言保持一致。这里的非实体或值对象操作会有很多种情况,比如某个操作需要对多个领域对象操作,输出一个值对象。在分层的架构中,有点类似于Manager。但是如果过渡抽象manager就会出现贫血,所以还需要确保领域服务是无状态的,并且做好和贫血模型的权衡。可能大多数情况,领域服务的参数都是比实际的领域模型小的,只有些关键属性的值对象。如果服务只操作领域的实体或值对象,则可以考虑下放到domain model中操作。

RedLock

基于单点的分布式锁无法解决redis故障的问题. 为了保证redis的可用性我们通常采用主从备份的方法, 即 使用一个master实例和至少一个slave实例.

当有写入请求时先写入master然后写入到所有slave, 当master实例故障时选择一个slave实例升级为master实例继续提供服务.

其中存在的问题是, 写入master和写入slave存在时间差. 若线程A成功将锁记录写入了master, 随后在同步写入slave之前, master故障转移到slave.

因为slave(新master)中没有锁记录, 因此线程B也可以成功加锁, 因此可能出现A和B同时持有锁的错误.

为了解决redis失效可能造成的问题, redis的作者antirez提出了RedLock实现方案:

  1. 客户端获取当前时间

  2. 客户端尝试获取N个节点的锁, 每个节点使用相同的key和value. 请求超时时间要远小于锁超时时间, 避免在节点或者网络故障时浪费时间.

  3. 客户端计算在加锁时消耗的时间, 只有客户端成功获得超过一半节点的锁且总时间小于锁超时间时才能成功加锁. 客户端持有锁的时间为锁超时时间减去加锁消耗的时间.

  4. 若获取锁失败则访问所有节点, 发起释放锁的请求.

释放锁时需要向所有Redis节点发出释放锁的请求, 原因在于可能某个Redis实例中成功写入了锁记录, 但是没有响应没有到达客户端.

为了保证所有锁记录都被正确释放, 所以需要向所有Redis实例发送释放请求.

实现

成功返回1,失败返回0。

前面提到了Manager,但是很多应用中都会把Manager抽象成接口的形式,但大多数情况其实完全没有必要,可以通过服务Factory的方式解耦,或者用Spring的@Service注解来注入真正的服务实现类。对于一些简单的领域操作,还可以抽象一个迷你层,这个迷你层也可以称作是领域服务,只不过是无状态,无事务,安全的一个抽象层。

关于安全性的讨论

关于RedLock的安全性问题, Martin Kleppmann和作者antirez进行了一些讨论:

  • Martin Kleppmann: How to do distributed locking
  • antirez:[Is Redlock safe?](http://antirez.com/news/101)

关于这场讨论的分析可以参考:

  • 基于Redis的分布式锁到底安全吗?

关键点

  • 锁的时效设置。避免单点故障造成死锁,影响其他客户端获取锁。但是也要保证一旦一个客户端持锁,在客户端可用时不会被其他客户端解锁。(网上很多解决方案都是其他客户端等待队列长度判断是否强制解锁,但其实在偶发情况下就不能保证一致性,也就失去了分布式锁的意义)。
  • 持锁期间的check,尽量在关键节点检查锁的状态,所以要设计成可重入锁,但在客户端使用时要做好吞吐量的权衡。
  • 减少获取锁的操作,尽量减少redis压力。所以需要让客户端的申请锁有一个等待时间,而不是所有申请锁的请求要循环申请锁。
  • 加锁的事务或者操作尽量粒度小,减少其他客户端申请锁的等待时间,提高处理效率和并发性。
  • 持锁的客户端解锁后,要能通知到其他等待锁的节点,否则其他节点只能一直等待一个预计的时间再触发申请锁。类似线程的notifyAll,要能同步锁状态给其他客户端,并且是分布式消息。
  • 考虑任何执行句柄中可能出现的异常,状态的正确流转和处理。比如,不能因为一个节点解锁失败,或者锁查询失败(redis 超时或者其他运行时异常),影响整个等待的任务队列,或者任务池。

看上去SETNX 配合 EXPIRE(过期时间)是个不错的选择,于是就有了加锁错误示例1:

jedis.setnx("lockName","value");
//这里redis挂掉,就是一个死锁
jedis.expire("lockName",10);

因为这两个操作不具备原子性,所以可能出现死锁,之所以有这样的示例,是因为低版本的redis的SET还不支持多参数命令

从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改
EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
XX :只在键已经存在时,才对键进行设置操作。

这里可以引出 redis正确的加锁示例:

 public static boolean lock(Jedis jedis, String lockKey, String uid, int expireTime) {  

        String result = jedis.set(lockKey, uid,"NX" "PX", expireTime);  

        if ("OK".equals(result)) {  
            return true;  
        }  
        return false;  

    } 

 其实就等于在redis中执行了 :set key value nx px 10000

4233永利皇宫 1

第一个为key,我们使用key来当锁名

第二个为value,我们传的是uid,唯一随机数,也可以使用本机mac地址 + uuid

第三个为NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作 第四个为PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定 第五个为time,代表key的过期时间,对应第四个参数 PX毫秒,EX秒

 

再来看一下分布式锁的要求:

 

分布式锁是用于解决分布式系统中操作共享资源时的数据一致性问题

 

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

 

领域事件其实也可以归纳为领域服务,不过领域服务的事件是幂等的。因为领域服务是无事务的,所以事件也是无副作用的,这样在处理聚合依赖的时候,需要保证他们的最终一致性。

锁设计

  • 加锁
// 检查是否key已经被占用,如果没有则设置超时时间和唯一标识,初始化value=1
if (redis.call('exists', key) == 0) 
then  
redis.call('hset', key, key-uunid, 1); //hset key field value
redis.call('pexpire', key, timeout);  //设置超时时间为毫秒
return null; 
end; 
// 如果锁重入,需要判断锁的key field 的情况 
if (redis.call('hexists', key, key-unuid) == 1) 
then 
redis.call('hincrby', key, key-unuid, 1);//使字段值增加指定的整数
redis.call('pexpire', key, timeout);//锁重入重新设置超时时间
return null; 
end; 
// 返回剩余的过期时间
return redis.call('pttl', key);
  • 解锁
// 如果key已经不存在,说明已经被解锁,直接发布(publihs)redis消息
if (redis.call('exists', key) == 0) 
then
redis.call('publish', channelName, ARGV[1]);
    return 1;
end;
// key和field不匹配,说明当前客户端线程没有持有锁,不能主动解锁。
if (redis.call('hexists', key, key-uuid) == 0)
then 
    return null;
end; 
// 将value减1
local counter = redis.call('hincrby', key, key-uuid, -1); 
// 如果counter>0说明锁在重入,不能删除key
if (counter > 0)  
then
    redis.call('pexpire', key, timeout);                            
    return 0; 
else 
// 删除key并且publish 解锁消息
redis.call('del', key);                           
 redis.call('publish', channelName, ARGV[1]); 
return 1; 
end; 
return null;

互斥性。在任意时刻,只有一个客户端能持有锁。

{领域事件

Redisson源码解析

不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

将领域中发生的活动建模成一系列的离散事件,每个事件都用领域对象来表示。简而言之,领域事件就是领域中发生的事件。还拿租书为例,一本书被借走了,那么需要产生一个借书订单,并且对于租书者来说,需要能查看自己租书的列表和书籍详情,同时这本书也需要被标记为不能再借出的状态(因为已经被借走了)。这里面bookRent就可以作为一个领域事件来发出。

相关资料

分布式锁的几种实现方式
Redis实现分布式锁全局锁—Redis客户端Redisson中分布式锁RLock实现
基于Redis实现分布式锁,Redisson使用及源码分析

性能。排队等待锁的节点如果不知道锁何时会被释放,则只能隔一段时间尝试获取一次锁,这样无法保证资源的高效利用,因此当锁释放时,要能够通知等待队列,使一个等待节点能够立刻获得锁。

事件的聚合

重入。同一个线程可以重复拿到同一个资源的锁。

NX保证互斥性

PX保证不会死锁

Value传入的唯一标识保证是自己的锁(可以通过随机uuid+线程名称 来保证唯一)

对于上述的事件模型,我们可以创建具有聚合特性的领域事件。这里我们可以把这个事件本身建模成一个聚合(BookRentEvent 对象),并且有自己的持久化方式。唯一标识可以由一组属性决定,在客户方(Client)调用领域服务的时候创建这个领域事件{new bookRentEvent())},并添加到资源库中,然后再通过消息的方式进行发布。发布成功后再回调更新时间状态。但这里需要注意,消息发布最好和事件资源库在相同的上下文,或共享数据源,这样就可以保证事件的成功提交,在不同上下文系统,就需要做全局事务来保证。而唯一标识在这里的作用就是为了防止消息重发或者重复处理。所以订阅方需要检查重复消息,并且忽略。如果是本地上下文的事件,最好提供equals和hashcode 实现。

PS:因为 SET 命令可以通过参数来实现和 SETNX 、 SETEX 和 PSETEX 三个命令的效果,不知道将来的 Redis 版本会不会废弃 SETNX 、 SETEX 和 PSETEX 这三个命令 ?

 

下面看一个释放锁的错误示例

public static void wrongUnLock1(Jedis jedis, String lockKey, String requestId) {  

    // 判断加锁与解锁是不是同一个线程  
    if (requestId.equals(jedis.get(lockKey))) {  
        // lockkey锁失效,下一步删除的就是别人的锁  
        jedis.del(lockKey);  
    }  

}   

根本问题还是保证操作的原子性,因为是两步操作,即便判断到是当前线程的锁,但是也有可能再删除之前刚好过期,这样删除的就是其他线程的锁。

如果业务要求精细,我们可以使用lua脚本来进行完美解锁

/**
     * redis可以保证lua中的键的原子操作 unlock:lock调用完之后需unlock,否则需等待lock自动过期
     *
     * @param lock
     *  uid 只有线程已经获取了该锁才能释放它(uid相同表示已获取)
     */
    public  void unlock( String lock) {

        Jedis jedis = new Jedis("localhost");

        final String uid= tokenMap.get();
        if (StringUtil.isBlank(token))
            return;
        try {
            final String script = "if redis.call("get","" + lock + "") == "" +
             uid + ""then  return redis.call("del","" + lock + "") else return 0 end ";
            jedis.eval(script);
        } catch (Exception e) {
            throw new RedisException("error");
        } finally {
            if (jedis != null)
                jedis.close();
        }
    }

 

关于lua:

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Lua 提供了交互式编程模式。我们可以在命令行中输入程序并立即查看效果。

 

结合刚才的例子,在书籍管理上下文中,书被借走了,那么书籍唯一表示和书的状态(Rent被借出)就可以标识一个事件。这个事件中需要有借书人的信息(如id,nick等),那么在持久化这个事件后,可以post一个Eventbus的本地消息,由用户书籍领域服务监听,更新用户书籍列表等一系列操作。然后再Callback到事件源,更新事件状态,处理成功。如果需要处理事件都在本地上下文,处理起来并不麻烦。

lua脚本优点

  • 减少网络开销:本来多次网络请求的操作,可以用一个请求完成,原先多次请求的逻辑放在redis服务器上完成。使用脚本,减少了网络往返时延

  • 原子操作:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入

  • 复用:客户端发送的脚本会永久存储在Redis中,意味着其他客户端可以复用这一脚本而不需要使用代码完成同样的逻辑

 上面这个脚本很简单

if redis.call("get","" + lock + "") // redisGET命令
 == "" +uid + // 判断是否是当前线程
 ""then  return redis.call("del","" + lock + "") // 如果是,执行redis DEL操作,删除锁
 else return 0 end  

 

同理我们可以使用lua给线程加锁

 

local lockkey = KEYS[1]
--唯一随机数
local uid = KEYS[2]
--失效时间,如果是当前线程,也是续期时间
local time = KEYS[3]

if redis.call('set',lockkey,uid,'nx','px',time)=='OK' then
return 'OK'
else
    if redis.call('get',lockkey) == uid then
       if redis.call('EXPIRE',lockkey,time/1000)==1 then
       return 'OOKK'
       end
    end
end

 

lua脚本也可以通过外部文件读取,方便修改

 

  public void luaUnLock() throws Exception{
        Jedis jedis = new Jedis("localhost") ;
        InputStream input = new FileInputStream("unLock.lua");
        byte[] by = new byte[input.available()];
        input.read(by);
        String script = new String(by);
        Object obj = jedis.eval(script, Arrays.asList("key","123"), Arrays.asList(""));
        System.out.println("执行结果 " + obj);
    }

 

PS:跟同事讨论的时候,想到可不可以利用redis的额事物来解锁,并没有实际使用,怕有坑。

发布领域事件

redis事物解锁

public boolean unLock(Jedis jedis, String lockName, String uid) throws Exception{
            jedis.watch(lockName);
            //这里的判断uid和下面的del虽然不是原子性,有了watch可以保证不会误删锁
            if (jedis.get(lockName).equals(uid)) {
                redis.clients.jedis.Transaction transaction = jedis.multi();
                transaction.del(lockName);
                List<Object> exec = transaction.exec();
                if (exec.get(0).equals("OK")) {
                    transaction.close();
                    return true;
                }
            }
            return false;
        }

 

 

领域事件的发布可以用Observer模式。在本地上下文,也要尽量减少对基础设施或者消息中间件暴露领域模型,所以,需要将本地模型(领域模型)封装成事件的聚合。比如我们不能直接发布一个BookRent聚合的事件,而是一个BookRentEvent,这个Event对象,还会持有一些事件特有的属性,比如可能根据需要,会有occurTime(发生时间),isConsumed(是否已经被处理)。事件发布时,所有订阅方都会同步收到通知。领域事件的主要组件就是publisher和subscriber了。

4233永利皇宫,可重入锁

可重入锁,也叫做递归锁,指的是同一线程 外层函数获得锁之后 ,内层递归函数仍然有获取该锁的代码,但不受影响。

 

在Java中用set命令实现可重入锁

 

//保存每个线程独有的token
    private static ThreadLocal<String> tokenMap = new ThreadLocal<>();

/**
     * 这个例子还不太完善。  
     * redis实现分布式可重入锁,并不保证在过期时间内完成锁定内的任务,需根据业务逻辑合理分配seconds
     *
     * @param lock
     *            锁的名称
     * @param mseconds
     *            锁定时间,单位 毫秒
     *  token 对于同一个lock,相同的token可以再次获取该锁,不相同的token线程需等待到unlock之后才能获取
     *
     */
    public  boolean lock(final String lock,  int mseconds ,Jedis jedis) {
        // token 对于同一个lock,相同的token可以再次获取该锁,不相同的token线程需等待到unlock之后才能获取
        String token = tokenMap.get();
        if (StringUtil.isBlank(token)) {
            token = UUID.randomUUID().toString().replaceAll("-","");
            tokenMap.set(token);
        }
        boolean flag = false;
        try {
            String ret = jedis.set(lock, token, "NX", "PX", mseconds);
            if (ret == null) {// 该lock的锁已经存在
                String origToken = jedis.get(lock);// 即使lock已经过期也可以
                if (token.equals(origToken) || origToken==null) {
                // token相同默认为同一线程,所以token应该尽量长且随机,保证不同线程的该值不相同
                    ret = jedis.set(lock, token, "NX", "PX", mseconds);//
                    if ("OK".equalsIgnoreCase(ret))
                        flag = true;
                    System.out.println("当前线程 " + token);
                }
            } else if ("OK".equalsIgnoreCase(ret))
                flag = true;
            System.out.println("当前线程 " + token);
        } catch (Exception e) {

        } finally {
            if (jedis != null)
                jedis.close();
        }
        return flag;
    }

 

继续正题,说到lua脚本 和 可重入锁,就不得不提 redission了

发送者

redission

redisson是redis官网推荐的java语言实现分布式锁的项目

redission中提供了多样化的锁,

发送者本身并不表达一种领域概念,而是作为一种服务的形态。无论用什么技术方式实现,用什么框架,处理事件发送的思路也都可能不尽相同。比如,在web应用中,可以在启动应用的时候处理订阅者向发送者的事件注册(避免注册和处理发送的线程同步问题)。比如可以将关注的事件registe到本地的一个ThreadLocal的publisher List中。应用启动完成后,开始处理领域事件的时候,就可以发送一个事件的聚合。这个事件的聚合是一个事件对象,而不是领域模型中的实体,因为我们要暴露需要暴露的事件给其他上下文,而不是暴露完整的领域对象。如果使用EventBus,我们可以在post的时候,封装一个事件作为参数。

可重入锁(Reentrant Lock)

订阅者

公平锁(Fair Lock)

事件的订阅者可以作为应用服务的一个独立的组件。因为应用服务是在领域逻辑的外层,如果是纯粹的事件驱动,那么订阅者作为一种应用服务,也可以定位成具有单一职责的,负责事件存储的应用服务组件。

联锁(MultiLock)

分布式领域事件

红锁(RedLock)

在处理分布式事件中,最重要也是最难处理的就是一致性。消息的延迟,处理的不幂等就会影响领域模型状态的准确性和事件的处理。但是我们在系统间交互的过程中,可以用一些技术方式来达到最终一致性。这其中可能就需要进行事件模型的持久化。处理方式可以

 读写锁(ReadWriteLock)

本文由4233永利皇宫发布于4233永利皇宫,转载请注明出处:所以考虑可以使用SETNX,常见3种分布式的实现比

关键词:

单身共享,从O2O兴趣社交延伸到O2O生活服务

原标题:陌生人社交小程序「拼拼碰碰」:拒绝网聊,直接奔现 《百姓中国周刊》北京讯2017年9月22日下午2点在北京...

详细>>

4233永利皇宫:对应平台的后门了,它本身附带数

原标题:我是这样黑进你Node.js生产服务器的 1.    –p (- -payload-options) MSFVENOM常规选项 1.    –p (- -payload-options) 添...

详细>>

房间的女主人,Home和Amazon Echo到底有什么不同

原标题:以Echo为例,从锂矿厂到数据矿厂的人工智能产业全景地图是这样的 一直在人工智能领域领先的Google,家庭智...

详细>>

4233永利皇宫经常有做微信的朋友,的设计目标就

原标题:揭秘 | 用户突破百万量级,微擎靠的是什么? 1、微信公共帐号机器人WeBot 经常有做微信的朋友,问我能不能...

详细>>