分布式锁其实可以理解为:控制分布式系统有序的去对共享资源进行操作,通过互斥来保持一致性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package com.example.redis.tool;


import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* @author:xuecx
* @descript:<p>利用redis实现分布式锁</p>
* @date:2018/10/12
*/
public class RedisLock1 {
/**
* 加锁
*
* @param redisTemplate redis连接
* @param lockKey 锁名称
* @param retry 从试次数
* @param requestId 请求id
* @param expire 过期时间 单位s
* @return
*/
public static boolean lock(RedisTemplate redisTemplate, String lockKey, Integer retry, String requestId, int expire) {
for (int i = 0; i < retry; i++) {
//如果键不存在则新增,存在则不改变已经有的值
Boolean hasLock = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId);
if (hasLock) {
//设置过期时间
redisTemplate.expire(lockKey, expire, TimeUnit.MILLISECONDS);
return true;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return false;
}

/**
* 释放锁,支持集群和单机
*
* @param redisTemplate
* @param lockKey 锁名称
* @param requestId 请求id
* @return
*/
public static synchronized boolean unLock(RedisTemplate redisTemplate, String lockKey, String requestId) {
String UNLOCK_LUA="if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
List<String> lockKeys = Collections.singletonList(lockKey);
List<String> requestIds = Collections.singletonList(requestId);
RedisCallback<Long> callback = (RedisConnection connection) -> {
Object nativeConnection = connection.getNativeConnection();
// 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
// 集群模式
if (nativeConnection instanceof JedisCluster) {
return (Long) ((JedisCluster) nativeConnection).eval(UNLOCK_LUA,lockKeys, requestIds);
}
// 单机模式
else if (nativeConnection instanceof Jedis) {
return (Long) ((Jedis) nativeConnection).eval(UNLOCK_LUA, lockKeys, requestIds);
}
return 0L;
};
Long result = (Long) redisTemplate.execute(callback);
return result != null && result > 0;
}
}



一般情况上述分布式锁已经能满足很多场景了,

但是特殊情况,在 ==if (hasLock) {== 行的时候,系统挂了,那么就死锁了;

原因是加锁的地方不是原子操作。

将代码改进

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 加锁
*
* @param redisTemplate redis连接
* @param lockKey 锁名称
* @param retry 从试次数
* @param requestId 请求id
* @param expire 过期时间 单位s
* @return
*/
public static boolean lock(RedisTemplate redisTemplate, String lockKey, Integer retry, String requestId, int expire) {
int i = 0;
while (true) {
try {
RedisCallback<String> stringRedisCallback = new RedisCallback() {
@Override
public Object doInRedis(RedisConnection redisConnection) {
JedisCommands nativeConnection = (JedisCommands) redisConnection.getNativeConnection();
return nativeConnection.set(lockKey, requestId, "NX", "PX", expire);
}
};
String execute = (String) redisTemplate.execute(stringRedisCallback);
return !execute.isEmpty();
} catch (Exception e) {

}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {

}
i++;
if (i > retry) {
return false;
}
}
}

问题

1、锁过期了业务还没有完成怎么办?

目前只能通过对业务的测试,找到一个最大的业务执行时间,然后设置一个最大的业务执行时间。