专业编程基础技术教程

网站首页 > 基础教程 正文

大厂宝典-Redis(25):如何实现延迟队列(2)?

ccvgpt 2024-09-10 13:04:44 基础教程 12 ℃

PS:创作不易,感谢阅读,希望对读者有所帮助,喜欢的话可以【关注】支持一下,作者会持续更新 Java 生态圈常见知识。黑色加粗为重点关注内容!

分片优化

  • 效率分析
    • 脚本一共用到了四个命令:返回有序成员列表 zrangeByScore、在有序集中移除元素 zrem、设置 Hash 表 Hmget、从 Hash 表删除 Hdel
    • 可以看到ZREM命令的耗时是最大的,若有5000条数据,每次移除100条,则时间复杂度为 100 * log(5000),因此减少有序集合元素个数,对于降低Lua脚本运行的耗时是有一定帮助的

命令

大厂宝典-Redis(25):如何实现延迟队列(2)?

时间复杂度

参数说明

ZREVRANGEBYSCORE

O(log(N)+M)

N是有序集合中的元素总数,M是返回的元素的数量

ZREM

O(M*log(N))

N是有序集合中的元素总数,M是成功移除的元素的数量

HMGET

O(L)

L是成功返回的域的数量

HDEL

O(L)

L是要删除的域的数量

  • 单个Redis实例,对ZSet和Hash两个集合的数据进行分片
    • 假设分片数量为2,以业务Id(userId)对分片数量取模,组装不同的 key,由 key 决定在哪个分片
  • 多个 Redis 实例分片
    • 利用哈希槽进行数据分片
    • 单Redis实例分片其实存在一个问题,就是Redis实例总是单线程处理客户端的命令,即使客户端是多个线程执行Redis命令
    • 虽然通过分片降低了Lua脚本命令的复杂度,但是Redis的命令处理模型(单线程)也有可能成为另一个性能瓶颈隐患

延迟队列比较

方案

优势

劣势

选用场景

JDK 内置的延迟队列 DelayQueue

实现简单

数据内存态,不可靠

一致性相对低的场景

调度框架和 MySQL 进行短间隔轮询

实现简单,可靠性高

存在明显的性能瓶颈

数据量较少,实时性相对低的场景

RabbitMQ的死信队列

异步交互可以削峰

延时的时间长度不可控,如果数据需要持久化则性能会降低


Redis进行短间隔轮询

数据持久化,高性能

实现难度大

常见于支付结果回调方案

lua脚本、核心代码

-- /lua/enqueue.lua
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local zset_value = ARGV[1]
local zset_score = ARGV[2]
local hash_field = ARGV[3]
local hash_value = ARGV[4]
redis.call('ZADD', zset_key, zset_score, zset_value)
redis.call('HSET', hash_key, hash_field, hash_value)
return nil

-- /lua/dequeue.lua
-- 参考jesque的部分Lua脚本实现
local zset_key = KEYS[1]
local hash_key = KEYS[2]
local min_score = ARGV[1]
local max_score = ARGV[2]
local offset = ARGV[3]
local limit = ARGV[4]
-- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代
local status, type = next(redis.call('TYPE', zset_key))
if status ~= nil and status == 'ok' then
    if type == 'zset' then
        local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit)
        if list ~= nil and #list > 0 then
            -- unpack函数能把table转化为可变参数
            redis.call('ZREM', zset_key, unpack(list))
            local result = redis.call('HMGET', hash_key, unpack(list))
            redis.call('HDEL', hash_key, unpack(list))
            return result
        end
    end
end
return nil
public void consumer() {
  Executors.newSingleThreadExecutor().submit(new Runnable() {
    @Override
    public void run() {
      while (true) {
        Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(key, 0, System.currentTimeMillis(), 0, 10);
        if (taskIdSet == null || taskIdSet.isEmpty()) {
          System.out.println("没有任务");
  
        } else {
          taskIdSet.forEach(id -> {
            long result = RedisOps.getJedis().zrem(key, id);
            if (result == 1L) {
              System.out.println("从延时队列中获取到任务,taskId:" + id + " , 当前时间:" + LocalDateTime.now());
            }
          });
        }
        try {
          /* 每5秒触发一次 */
          TimeUnit.MILLISECONDS.sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  });
}

Tags:

最近发表
标签列表