背景
随着微服务的普及,越来越多的应用开始使用多节点互备的模式进行部署,这就可能会带来一个新的问题。在单节点的情况下,我们可以简单地使用Java的锁机制进行方法锁或类锁,但是在多节点的情况下,应该如何保证某个方法在同一时间点或同一时间段内,只被调用一次呢?
本文将基于Redis缓存及Java的AOP机制设计实现一种分布式方法锁,用来应对多节点情况下如跑批、定时任务、调度等问题。
锁注解
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY_GETTER)
@Retention(AnnotationRetention.RUNTIME)
annotation class DistributedLock(
val lockPrefix: String = "",
val lockKey: String = "",
val retryTimes: Int = 3,
val timeOut: Long = 5,
val timeUnit: TimeUnit = TimeUnit.SECONDS)
首先定义好分布式锁的相关属性如:锁前缀、锁名、重试次数、锁超时时间、时间单位。
在Redis缓存下,使用锁前缀作为不同锁分组的命名空间,锁名表示需加锁的方法名;重试次数表示在获取锁失败的情况下,进程可重新请求锁的最大重试次数;锁超时时间表示在该指定时间后,若新进程无法获取锁则进行强制解锁。
注解切面
定义切面的环绕方法:线程对方法进行加锁,若加锁失败则结束线程,若加锁成功则执行方法,最终对方法解锁。
fun lockAroundAction(joinPoint: ProceedingJoinPoint) {
if (this.lock(joinPoint, 0)) {
try {
joinPoint.proceed()
} catch (throwable: Throwable) {
throw DistributedLockException("分布式锁异常,${throwable.message}", throwable)
} finally {
this.unlock(joinPoint)
}
} else logger.warn("无法调用, 有其他线程正在执行该任务")
}
加锁方法如下:
private fun lock(joinPoint: ProceedingJoinPoint, count: Int): Boolean {
val annotationArgs = this.getAnnotationArgs(joinPoint)
return if (annotationArgs.isEmpty()) false
else {
val lockPrefix = annotationArgs["prefix"].toString()
val key = annotationArgs["key"].toString()
val expire = annotationArgs["expire"].toString().toLong()
val retryTime = annotationArgs["retry"].toString().toInt()
if (lockPrefix.isBlank() || key.isBlank()) {
throw DistributedLockException("CacheLock锁前缀或锁名未指定")
}
if (commonLockHelper.addLock(lockPrefix, key, expire)) {
return true
} else {
// 获取锁创建时间
val createTime = commonLockHelper.getLockTime(lockPrefix, key)
// 当前时间已锁超时时间,强制解锁并开始线程
if (System.currentTimeMillis() - createTime > expire) {
logger.info("锁超时,强制解锁")
commonLockHelper.remove(lockPrefix, key)
lock(joinPoint, count.plus(1))
} else {
// 超过最大可重试次数,结束请求
if (count >= retryTime) {
logger.warn("请求锁超过可重试上限, 中断请求")
return false
} else {
// 下次请求在等待 1 << (count+1) 秒后执行
val waitTime = PROTECT_TIME shl count
logger.info("获取锁失败,等待${waitTime}秒后重试")
Thread.sleep(PROTECT_TIME shl count)
lock(joinPoint, count.plus(1))
}
}
}
}
}
获取锁属性:
private fun getAnnotationArgs(joinPoint: ProceedingJoinPoint): Map<String, Any> {
val target = joinPoint.target.javaClass
val methods = target.methods
val methodName = joinPoint.signature.name
for (method in methods) {
if (method.name.equals(methodName, true)) {
val redisLock = method.getAnnotation(DistributedLock::class.java)
val expire = TimeoutUtil.toMillis(redisLock.timeOut, redisLock.timeUnit)
return mapOf("prefix" to redisLock.lockPrefix, "key" to redisLock.lockKey, "expire" to expire, "retry" to redisLock.retryTime)
}
}
return emptyMap()
}
解锁方法如下:
private fun unlock(joinPoint: ProceedingJoinPoint) {
val annotationArgs = this.getAnnotationArgs(joinPoint)
val lockPrefix = annotationArgs["prefix"].toString()
val key = annotationArgs["key"].toString()
if (lockPrefix.isBlank() || key.isBlank()) {
throw DistributedLockException("CacheLock锁前缀或锁名未指定")
} else commonLockHelper.remove(lockPrefix, key)
}
自定义异常
class DistributedLockException : RuntimeException {
constructor() : super()
constructor(message: String) : super(message)
constructor(message: String, cause: Throwable) : super(message, cause)
constructor(cause: Throwable) : super(cause)
}
RedisLockHelper工具类
为后续的可扩展性,抽象出CommonLockHelper
接口:
interface CommonLockHelper {
fun addLock(track: String, sector: String, timeout: Long): Boolean
fun remove(track: String, sector: String)
fun getLockTime(track: String, sector: String): Long
}
RedisLockHelper的实现如下:
class RedisLockHelper(val redisTemplate: RedisTemplate<String, String>) : CommonLockHelper {
override fun addLock(track: String, sector: String, timeout: Long): Boolean {
val valueOperations = redisTemplate.opsForValue()
val cacheKey = "$track:$sector"
val flag = valueOperations.setIfAbsent(cacheKey, System.currentTimeMillis().toString()) ?: false
if (flag) valueOperations.set(cacheKey, getLockTime(track, sector).toString(), timeout, TimeUnit.SECONDS)
return flag
}
override fun remove(track: String, sector: String) {
redisTemplate.delete("$track:$sector")
}
override fun getLockTime(track: String, sector: String): Long {
val valueOperations = redisTemplate.opsForValue()
return valueOperations.get("$track:$sector")?.toLong() ?: 0
}
}
在加锁过程中,首先尝试使用当前时间戳写入缓存cacheKey。
- 如果写入失败,表示当前锁已被占用,返回加锁失败状态。
- 如果写入成功,则此时获取锁成功,将之前写入的时间戳覆盖更新缓存cacheKey并设置该缓存的超时时间,返回加锁成功。
配置
最后进行分布式锁的工具类注入配置:
@Bean
fun commonLockHelper(): CommonLockHelper {
return RedisLockHelper(redisTemplate)
}
测试
- 测试方法,使用Thread.sleep()方法使线程强制耗时5s左右,并将该service包装成接口。
@Async
@DistributedLock("author", "name", 5)
fun getName() {
println("${DateUtil.format(Date(), "yyyy-MM-dd HH:mm:ss.SSS")} romani")
Thread.sleep(5000)
}
使用Postman连续请求不同端口的两个该服务实例,可以观察到日志如下:
在重试3次后,进程B获取锁并成功执行方法。
2. 将锁超时时间设置为1秒后重新测试,观察日志如下:
@DistributedLock("author", "name", 5, 1)
请求时与锁创建时的差值已经大于锁的超时时间,此时强制解锁并执行进程B。
3. 将锁重试次数设置为1,超时时间设置为10s后重新测试,观察日志如下:
@DistributedLock("author", "name", 1, 10)