PS:创作不易,感谢阅读,希望对读者有所帮助,喜欢的话可以【关注】支持一下,作者会持续更新 Java 生态圈常见知识。黑色加粗为重点关注内容!
分片优化
- 效率分析
- 脚本一共用到了四个命令:返回有序成员列表 zrangeByScore、在有序集中移除元素 zrem、设置 Hash 表 Hmget、从 Hash 表删除 Hdel
- 可以看到ZREM命令的耗时是最大的,若有5000条数据,每次移除100条,则时间复杂度为 100 * log(5000),因此减少有序集合元素个数,对于降低Lua脚本运行的耗时是有一定帮助的
命令 | 时间复杂度 | 参数说明 |
ZREVRANGEBYSCORE | O(log(N)+M) | N是有序集合中的元素总数,M是返回的元素的数量 |
ZREM | O(M*log(N)) | N是有序集合中的元素总数,M是成功移除的元素的数量 |
HMGET | O(L) | L是成功返回的域的数量 |
HDEL | O(L) | L是要删除的域的数量 |
- 单个Redis实例,对ZSet和Hash两个集合的数据进行分片
- 假设分片数量为2,以业务Id(userId)对分片数量取模,组装不同的 key,由 key 决定在哪个分片
- 多个 Redis 实例分片
- 利用哈希槽进行数据分片
- 单Redis实例分片其实存在一个问题,就是Redis实例总是单线程处理客户端的命令,即使客户端是多个线程执行Redis命令
- 虽然通过分片降低了Lua脚本命令的复杂度,但是Redis的命令处理模型(单线程)也有可能成为另一个性能瓶颈隐患
延迟队列比较
方案 | 优势 | 劣势 | 选用场景 |
JDK 内置的延迟队列 DelayQueue | 实现简单 | 数据内存态,不可靠 | 一致性相对低的场景 |
调度框架和 MySQL 进行短间隔轮询 | 实现简单,可靠性高 | 存在明显的性能瓶颈 | 数据量较少,实时性相对低的场景 |
RabbitMQ的死信队列 | 异步交互可以削峰 | 延时的时间长度不可控,如果数据需要持久化则性能会降低 | |
Redis进行短间隔轮询 | 数据持久化,高性能 | 实现难度大 | 常见于支付结果回调方案 |
lua脚本、核心代码
-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil
-- /lua/dequeue.lua
-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
if type == 'zset' then
local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
if list ~= nil and #list > 0 then
-- unpack函数能把table转化为可变参数
redis.call('ZREM', zset_key, unpack(list))
local result = redis.call('HMGET', hash_key, unpack(list))
redis.call('HDEL', hash_key, unpack(list))
return result
end
end
end
return nil
public void consumer() {
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
while (true) {
Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(key, 0, System.currentTimeMillis(), 0, 10);
if (taskIdSet == null || taskIdSet.isEmpty()) {
System.out.println("没有任务");
} else {
taskIdSet.forEach(id -> {
long result = RedisOps.getJedis().zrem(key, id);
if (result == 1L) {
System.out.println("从延时队列中获取到任务,taskId:" + id + " , 当前时间:" + LocalDateTime.now());
}
});
}
try {
/* 每5秒触发一次 */
TimeUnit.MILLISECONDS.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}