在集群环境多节点下,存在多个进程竞争资源的问题,而传统Java自带的锁已经不能满足需求,这时就需要分布式锁。

分布式锁有多种实现方案,Zookeeper等,而Redis的分布式锁性能非常高。

模拟减库存场景
@Slf4j
@RequestMapping("/redis/lock")
@RestController
public class RedisLockController {

    @Autowired
    private StringRedisTemplate stringRedisTemplatel;

    // 1001商品的 redis key
    private String rediskey = "product:1001:";

    @PostConstruct
    public void initStock() {
        //初始化库存100
        stringRedisTemplatel.opsForValue().set(rediskey + "stock", String.valueOf(50));
        log.info("init stock 100");
    }

    //卖出商品接口
    @RequestMapping("sellProduct")
    public String sellProduct() {
        //尝试使用jdk内置锁
        synchronized (this) {
            ValueOperations<String, String> stringValueOperations = stringRedisTemplatel.opsForValue();
            //查询库存
            Integer stock = Integer.valueOf(stringValueOperations.get(rediskey + "stock"));
            if (stock <= 0) {
                //库存不足
                return "库存不足";
            }
            log.info("sell product , current stock:[{}] , surplus stock: [{}]", stock, --stock);
            stringValueOperations.set(rediskey + "stock", String.valueOf(stock));
        }
        return "成功售出";
    }
}

上诉代码,单机情况下是没有什么问题的,但是在集群环境下,存在多个进程消费一个stock的可能。

配置nginx反向代理,然后idea运行两个SpringBoot进程,端口不同,分别为9090,9092.

    upstream myServer {
        server 192.168.2.103:9090 weight=1;
        server 192.168.2.103:9092 weight=1;
    }

    server {
        listen 80;
        location / {
            proxy_pass http://myServer;
        }
    }

配置jmeter自动化工具, 开启1000个线程,循环3次,调用nginx端口,再由nginx反向代理到两个进程。redis_lock_jmeter

测试结果输出如下,可见两个进程都消费了40,38 这两个库存

# 9090端口
...
sell product , current stock:[40] , surplus stock: [39]
sell product , current stock:[38] , surplus stock: [37]
...

# 9092
sell product , current stock:[40] , surplus stock: [39]
sell product , current stock:[38] , surplus stock: [37]
Redis分布式锁

在redis中,存在setnx指令集,redis在执行这条指令之前会判断key是否已存在,key不存在的时候才会执行,并且返回1,如果key已存在,则不会执行,并且返回0。

改造sellProduct接口,使用redistemplate的setIfAbsent方法,并且同时设置超时时间。

如果没有设置超时时间的话,万一线程在获取锁后宕机了,没来得及释放锁,也就是没有删除key,那么就会造成其他的线程无法获得锁。

为了防止代码出现异常无法删除key,应在finally代码块中删除key,而且在删除前,应判断是否是当前线程加的锁,否则可能删除掉其他线程加的锁。

代码如下:

   @RequestMapping("sellProduct")
    public String sellProduct() {
        // setnx锁的key
        String redisLockKey = rediskey + "lock";

        ValueOperations<String, String> stringValueOperations = stringRedisTemplatel.opsForValue();
        //生成一个uuid作为线程标识
        String uuid = UUID.randomUUID().toString();
        //redistemplate为我们包装了setnx指令,这里执行setnx指令获取锁并且指定超时时间。
        Boolean redisLock = stringValueOperations.setIfAbsent(redisLockKey, uuid, 10, TimeUnit.SECONDS);
        try {
            if (!redisLock) {
                return "操作频繁";
            }
            //查询库存
            Integer stock = Integer.valueOf(stringValueOperations.get(rediskey + "stock"));
            if (stock <= 0) {
                //库存不足
                return "库存不足";
            }
            log.info("sell product , current stock:[{}] , surplus stock: [{}]", stock, --stock);
            stringValueOperations.set(rediskey + "stock", String.valueOf(stock));
        } finally {
            //判断 这把锁是否是当前线程加的锁,是当前线程加的锁才删除掉
            if (uuid.equals(stringValueOperations.get(redisLockKey))) {
                //删除key,释放锁
                stringRedisTemplatel.delete(redisLockKey);
            }
        }
        return "成功售出";
    }

这时候再次开启jmeter,已经不会出现重复消费的问题了,一般来讲,做到这里已经足够了但是还会遇到一些问题。

  • 比如说在删除key之前,这个线程宕了,那么就会导致key在超时时间之前无法被释放,从而造成死锁。
  • 假设锁的超时时间是10秒,而某一个线程获取锁后需要执行15秒,那么就会导致线程还没执行完毕,而锁已经自动释放,这时候其他线程获取锁后,存在资源竞争的问题。
Redis lua脚本原子操作

以下代码,可能存在的问题是,在执行equals方法之后,执行delete之前宕机了,那么就会导致无法删除key。

也就是无法保证get、delete 两个操作的原子性。

//判断 这把锁是否是当前线程加的锁,是当前线程加的锁才删除掉
if (uuid.equals(stringValueOperations.get(redisLockKey))) {
    //删除key,释放锁
    stringRedisTemplatel.delete(redisLockKey);
}

用lua脚本来保障原子性

-- 获取指定key的内容,并且放到变量里 uuid
local uuid = redis.call("get",KEYS[1])
-- 判断uuid 是否与传递进来的uuid一致
if (uuid == ARGV[1]) then
 		-- 删除key  
    redis.call("del",KEYS[1])
  	-- 返回 1
    return 1
else
    --返回 0
    return 0
-- 脚本结束
end

使用RedisTemplate 执行lua脚本。lua脚本可以一次性执行多条redis命令,以此保障原子性。

            DefaultRedisScript<Number> redisScript = new DefaultRedisScript<>();
            redisScript.setResultType(Number.class);
            redisScript.setScriptText(
                    "local uuid = redis.call(\"get\",KEYS[1])\n" +
                            "if (uuid == ARGV[1]) then\n" +
                            "    redis.call(\"del\",KEYS[1])\n" +
                            "    return 1\n" +
                            "else\n" +
                            "    return 0\n" +
                            "end");
            List<String> keys = Collections.singletonList(redisLockKey);
            Number execute = stringRedisTemplatel.execute(
                    redisScript,
                    keys,
                    uuid
            );
						//这里的返回值,就是在lua脚本写的return所返回的
            int result = execute.intValue();
            log.info("execute:[{}]", result);
Redisson 分布式锁

之前提到过,假设setnx锁的超时时间设置为10秒,而某一个线程执行需要15秒,那么这个线程还未执行结束,其他线程会获取到锁,可能会产生资源竞争。那么要解决这个问题的话,有一套方案是为这把锁续命,就是开启一个子线程,然后延迟(超时时间 / 3)重新设置锁的超时时间。

Redisson已经提供了相应的实现,直接使用就可以。

添加pom依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.1</version>
</dependency>

配置redisson

@Configuration
public class RedissonConfiguration {
    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://127.0.0.1:6379")
                .setRetryInterval(5000)
                .setTimeout(10000)
                .setConnectTimeout(10000);
        return Redisson.create(config);
    }
}

实现代码

@Slf4j
@RequestMapping("/redis/lock")
@RestController
public class RedisLockController {

    @Autowired
    private StringRedisTemplate stringRedisTemplatel;
    @Autowired
    private RedissonClient redissonClient;

    // 1001商品的 redis key
    private String rediskey = "product:1001:";

    @PostConstruct
    public void initStock() {
        //初始化库存100
        stringRedisTemplatel.opsForValue().set(rediskey + "stock", String.valueOf(50));
        log.info("init stock 100");
    }

    //卖出商品接口
    @RequestMapping("sellProduct")
    public String sellProduct() {
        // setnx锁的key
        String redisLockKey = rediskey + "lock";
        //初始化锁
        RLock lock = redissonClient.getLock(redisLockKey);
        try {
            //执行加锁操作,如果没有竞争到锁,则会自旋,等待下一次获取锁。
            lock.lock();
            ValueOperations<String, String> stringValueOperations = stringRedisTemplatel.opsForValue();
            //查询库存
            Integer stock = Integer.valueOf(stringValueOperations.get(rediskey + "stock"));
            if (stock <= 0) {
                //库存不足
                return "库存不足";
            }
            log.info("sell product , current stock:[{}] , surplus stock: [{}]", stock, --stock);
            stringValueOperations.set(rediskey + "stock", String.valueOf(stock));
        } finally {
            //释放锁
            lock.unlock();
        }
        return "成功售出";
    }
}

有一点需要注意的是,lock.lock(); 是阻塞操作,当它获取锁失败,会自旋,一直获取锁。

如果不需要使用自旋锁,可以用,lock.tryLock() ;api 它会返回一个boolean类型的返回值 。