分布式锁

特性

  • 排他性,只有一个客户端能持有锁
  • 不死锁
  • 容错,部分redis节点挂了,也支持锁特性

Redis分布式锁

极客时间耗子叔分布式锁
Redis分布式锁官方文档
Is Redlock safe
How to do distributed locking

Redis 命令

  • 加锁
    SET NX 在key不存在的时候给key赋值,
    PX通知 Redis 保存这个key 30000ms(锁过期时间),当资源被锁定超过过期时间时,锁自动释放
    my_random_value 必须全局唯一

    1
    SET resource_name my_random_value NX PX 30000
  • 解锁
    根据my_random_value判断是否自己持有的锁,然后执行del

    1
    2
    3
    4
    5
    if redis.call("get",KEYS[1]) == ARGV[1] then 
    return redis.call("del",KEYS[1])
    else
    return 0
    end

Jedis 实现

  • 加锁
    value 使用UUID.randomUUID().toString()生成,也可以加入请求的唯一标识参数作为区分

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";
    /**
    * 尝试获取分布式锁
    * @param jedis Redis客户端
    * @param lockKey 锁Key
    * @param value 请求标识
    * @param expireTime 超期时间
    * @return 是否加锁成功
    */
    public static boolean lock(Jedis jedis, String lockKey, String value, int expireTime) {
    String result = jedis.set(lockKey, value, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
    if (LOCK_SUCCESS.equals(result)) {
    return true;
    }
    return false;
    }

    // 错误代码: setnx expire在程序上分两步,会导致过期时间没有成功设置,导致死锁,则需要做额外的包装处理
    Long result = jedis.setnx(lockKey, value);
    if (result == 1) {
    jedis.expire(lockKey, expireTime);
    }
  • 解锁
    利用Redis结合lua脚本保证判断和解锁的操作在redis上是原子操作

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private static final Long RELEASE_SUCCESS = 1L;
    /**
    * 释放分布式锁
    * @param jedis Redis客户端
    * @param lockKey 锁
    * @param value 加锁时生成的值
    * @return 是否释放成功
    */
    public static boolean unlock(Jedis jedis, String lockKey, String value) {
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(value));
    if (RELEASE_SUCCESS.equals(result)) {
    return true;
    }
    return false;
    }

    // 错误代码:多线程环境下1和2是不同步的
    public static void unlock(Jedis jedis, String lockKey, String value) {
    if (value.equals(jedis.get(lockKey))) { // 1
    jedis.del(lockKey);// 2
    }
    }

setnx expire 分两步的实现

公司框架对Jedis做了封装,没有使用上面的set加锁方法,而是setnx和expire两步走的方式, 则需要在外部加锁逻辑上加处理,解锁不变

  • 加锁

    1
    2
    3
    4
    5
    6
    7
    8
    // cache
    public boolean lock(String key, Object value, final int expireTime) {
    Long flag = sjedis.setnx(key, value);
    boolean result = (flag != null) && (flag.equals(Long.valueOf(1L)));
    if (result && expireTime > 0) {
    sjedis.expire(key, expireTime);
    }
    }
  • 加锁封装

    1
    2
    3
    4
    5
    6
    public boolean lockProxy(String key, Object value, final int expireTime) {
    boolean flag = cache.lock(key, value, expireTime);
    if (!flag && (cache.ttl(key) == -1)) { // 在加锁失败后ttl命令检查是否有超时时间,没有则需要补充上过期时间
    cache.setTtl(key, expireTime);
    }
    }

Redis锁的局限性

当任务(业务)时间因为多种因数(Full GC, RPC timeout)导致锁超时过期自动释放了,会导致后面出现多个客户端持有同一个锁,HBase 遇到过这种问题
unsafe
针对这种场景,需要使用fence,添加一个token或者version 来确保数据的正确性, 这就是一个典型的乐观锁实现
fence

所以,使用redis锁的一个前提是任务时间不能太长,如果为了防止不可控因数例如Full GC则需要加入乐观锁来控制,或者需要确保锁过期时,任务处理需要停下来,如果真需要用到乐观锁机制的话,redis锁已经不适用这个场景了
FIXME 关于redlock算法的理解?

zookeeper分布式锁

fixme https://blog.csdn.net/peace1213/article/details/52571445
https://dzone.com/articles/distributed-lock-using
http://colobu.com/2014/12/12/zookeeper-recipes-by-example-2/

乐观锁

不加锁,通过token识别是否出现数据冲突,乐观锁适用于读多写少的应用场景,提高吞吐量

  • vesion方式(version也可以使用时间戳代替)

    1
    UPDATE table_name SET xxx = #{xxx}, version=version+1 where version =#{version};
  • 不添加字段方式

    1
    2
    SELECT stock FROM tb_product where product_id=#{product_id};
    UPDATE tb_product SET stock=stock-#{num} WHERE product_id=#{product_id} AND stock=#{stock};

curator

https://curator.apache.org/curator-x-async/index.html

CAS

Compare And Swap 比较交换技术,无锁的设计思路,设计上更复杂

  • CAS算法
    一个CAS方法包含三个参数CAS(V,E,N)。V表示要更新的变量,E表示预期的值,N表示新值。只有当V的值等于E时,才会将V的值修改为N。如果V的值不等于E,说明已经被其他线程修改了,当前线程可以放弃此操作,也可以再次尝试次操作直至修改成功。基于这样的算法,CAS操作即使没有锁,也可以发现其他线程对当前线程的干扰(临界区值的修改),并进行恰当的处理。

  • AtomicInteger

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // volatile保证内存可见性,循环调用unsafe.compareAndSwapInt进行赋值,Unsafe类内部使用了native的CAS逻辑。
    private volatile int value;
    public final int getAndSet(int newValue) {
    for (;;) {
    int current = get();
    if (compareAndSet(current, newValue))
    return current;
    }
    }
    public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
  • FIXME CAS无锁队列的实现:https://coolshell.cn/articles/8239.html