SpringBoot集成Redisson延迟队列

jupiter
2023-11-03 / 0 评论 / 13 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2023年11月03日,已超过175天没有更新,若内容或图片失效,请留言反馈。

0. 使用场景

  1. 下单成功,30分钟未支付。支付超时,自动取消订单
  2. 订单签收,签收后7天未进行评价。订单超时未评价,系统默认好评
  3. 下单成功,商家5分钟未接单,订单取消
  4. 配送超时,推送短信提醒

1.Redisson延迟队列原理

redisson 使用了 两个list + 一个 sorted-set + pub/sub 来实现延时队列,而不是单一的sort-set。

  • sorted-set:存放未到期的消息&到期时间,提供消息延时排序功能
  • list1:存放未到期消息,作为消息的原始顺序视图,提供如查询、删除指定第几条消息的功能(分析源码得出的,查看哪些地方有使用这个list)
  • list2:消费队列,存放到期后的消息,提供消费

整体流程(对应画图PPT链接):

结合源码分析:

  • org.redisson.RedissonDelayedQueue#RedissonDelayedQueue
    首先创建延时队列的时候,会创建一个QueueTransferTask, 在里面会订阅一个topic,订阅成功后,执行其pushTask方法,里面会查询sorted-set中100个已到期的消息,将其push到lis2中,并从sorted-set和list1中移除。(这里是为了投递历史未处理的消息)

    protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
        super(codec, commandExecutor, name);
        channelName = prefixName("redisson_delay_queue_channel", getRawName());
        queueName = prefixName("redisson_delay_queue", getRawName());
        timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());
        
        QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                      + "if #expiredValues > 0 then "
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"
                              + "redis.call('rpush', KEYS[1], value);"
                              + "redis.call('lrem', KEYS[3], 1, v);"
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                      + "if v[1] ~= nil then "
                         + "return v[2]; "
                      + "end "
                      + "return nil;",
                      Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
                      System.currentTimeMillis(), 100);
            }
            
            @Override
            protected RTopic getTopic() {
                return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };
        
        queueTransferService.schedule(queueName, task);
        
        this.queueTransferService = queueTransferService;
    }
    
  • org.redisson.RedissonDelayedQueue#offerAsync(V, long, java.util.concurrent.TimeUnit)
    发送延时消息时,会将消息写入 list1和 sorted-set 中,msg会添加一个randomId,支持发送相同的消息。并且判断sorted-set首条消息如果是刚插入的,则publish timeout(到期时间) 到 topic

    public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
        if (delay < 0) {
            throw new IllegalArgumentException("Delay can't be negative");
        }
        
        long delayInMs = timeUnit.toMillis(delay);
        long timeout = System.currentTimeMillis() + delayInMs;
     
        long randomId = ThreadLocalRandom.current().nextLong();
        return commandExecutor.evalWriteAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" 
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"
              + "redis.call('rpush', KEYS[3], value);"
              // if new object added to queue head when publish its startTime 
              // to all scheduler workers 
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "
              + "if v[1] == value then "
                 + "redis.call('publish', KEYS[4], ARGV[1]); "
              + "end;",
              Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
              timeout, randomId, encode(e));
    }
  • org.redisson.QueueTransferTask#scheduleTask
    订阅到topic消息后,会先判断其是否临期(delay<10ms),如果是则调用pushTask方法,不是则启动一个定时任务(使用的netty时间轮),延时delay后执行pushTask方法。

    // 订阅topic onMessage 时调用
    private void scheduleTask(final Long startTime) {
        TimeoutTask oldTimeout = lastTimeout.get();
        if (startTime == null) {
            return;
        }
        
        if (oldTimeout != null) {
            oldTimeout.getTask().cancel();
        }
        
        long delay = startTime - System.currentTimeMillis();
        if (delay > 10) {
        // 使用 netty 时间轮 启动一个定时任务
            Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
                @Override
                public void run(Timeout timeout) throws Exception {
                    pushTask();
                    
                    TimeoutTask currentTimeout = lastTimeout.get();
                    if (currentTimeout.getTask() == timeout) {
                        lastTimeout.compareAndSet(currentTimeout, null);
                    }
                }
            }, delay, TimeUnit.MILLISECONDS);
            if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
                timeout.cancel();
            }
        } else {
            pushTask();
        }
    }    
    
    private void pushTask() {
        RFuture<Long> startTimeFuture = pushTaskAsync();
        startTimeFuture.onComplete((res, e) -> {
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error(e.getMessage(), e);
                scheduleTask(System.currentTimeMillis() + 5 * 1000L);
                return;
            }
            
            if (res != null) {
                scheduleTask(res);
            }
        });
    }

2.SpringBoot集成

实验环境:SpringBoot版本3.0.12

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.0.12</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

2.1 引入 Redisson 依赖

 <!--redission-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.19.0</version>
</dependency>

2.2 配置文件

spring:
  data:
    redis:
      host: 172.19.236.66
      port: 6379
      #password: 123456
      database: 0
      timeout: 3000

2.3 创建 RedissonConfig 配置

package com.example.redissionstudy.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author LuoJia
 * @version 1.0
 * @description: Redis链接配置文件
 * @date 2023/11/3 8:59
 */
@Configuration
public class RedissonConfig {

    @Value("${spring.data.redis.host}")
    private String host;
    @Value("${spring.data.redis.port}")
    private int port;
    @Value("${spring.data.redis.database}")
    private int database;
    //@Value("${spring.data.redis.password}")
    //private String password;

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://" + host + ":" + port)
                .setDatabase(database);
                //.setPassword(password)
        return Redisson.create(config);
    }
}
  • 测试使用
@SpringBootTest
@Slf4j
class RedissionStudyApplicationTests {
    @Resource
    RedissonClient redissonClient;

    @Test
    void testRedission() {
        //字符串操作
        RBucket<String> rBucket =  redissonClient.getBucket("strKey");
        // 设置value和key的有效期
        rBucket.set("张三", 30, TimeUnit.MINUTES);
        // 通过key获取value
        System.out.println(redissonClient.getBucket("strKey").get());
    }
}
张三
  • redis查看结果
127.0.0.1:6379> keys str*
1) "strKey"
127.0.0.1:6379> get strKey
"\x03\x83\xe5\xbc\xa0\xe4\xb8\x89"

2.4 封装 Redis 延迟队列工具类

package com.example.redissionstudy.utils;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * @author LuoJia
 * @version 1.0
 * @description:  Redission 延迟队列工具类
 * @date 2023/11/3 9:51
 */
@Slf4j
@Component
public class RedisDelayQueueUtil {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加延迟队列
     *
     * @param value     队列值
     * @param delay     延迟时间
     * @param timeUnit  时间单位
     * @param queueCode 队列键
     * @param <T>
     */
    public <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode) {
        try {
            RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
            RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            delayedQueue.offer(value, delay, timeUnit);
            log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toSeconds(delay) + "秒");
        } catch (Exception e) {
            log.error("(添加延时队列失败) {}", e.getMessage());
            throw new RuntimeException("(添加延时队列失败)");
        }
    }

    /**
     * 获取延迟队列
     *
     * @param queueCode
     * @param <T>
     * @return
     * @throws InterruptedException
     */
    public <T> T getDelayQueue(String queueCode) throws InterruptedException {
        RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);
        T value = (T) blockingDeque.take();
        return value;
    }
}

2.5 创建延迟队列业务枚举

package com.example.redissionstudy.enums;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

/**
 * @author LuoJia
 * @version 1.0
 * @description: 延迟队列业务枚举
 * @date 2023/11/3 9:53
 */
@Getter
@AllArgsConstructor
@NoArgsConstructor
public enum RedisDelayQueueEnum {
    ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT", "订单支付超时,自动取消订单", "orderPaymentTimeout"),
    ORDER_TIMEOUT_NOT_EVALUATED("ORDER_TIMEOUT_NOT_EVALUATED", "订单超时未评价,系统默认好评", "orderTimeoutNotEvaluated");

    /**
     * 延迟队列 RedisKey
     */
    private String code;

    /**
     * 中文描述
     */
    private String name;

    /**
     * 延迟队列具体业务实现的 Bean
     * 可通过 Spring 的上下文获取
     */
    private String beanId;
}

2.6 定义延迟队列执行器

package com.example.redissionstudy.handler;

/**
 * @author LuoJia
 * @version 1.0
 * @description: 延迟队列执行器接口
 * @date 2023/11/3 9:58
 */
public interface RedisDelayQueueHandle<T>{
    void execute(T t);
}

2.7 创建枚举中定义的Bean,并实现延迟队列执行器

  • OrderPaymentTimeout:订单支付超时延迟队列处理类
package com.example.redissionstudy.handler.impl;

import com.example.redissionstudy.enums.RedisDelayQueueEnum;
import com.example.redissionstudy.handler.RedisDelayQueueHandle;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author LuoJia
 * @version 1.0
 * @description: 订单支付超时处理类
 * @date 2023/11/3 10:00
 */
@Component
@Slf4j
public class OrderPaymentTimeout implements RedisDelayQueueHandle<Map> {
    @Override
    public void execute(Map map) {
        log.info("{} {}", RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getName(), map);
        // TODO 订单支付超时,自动取消订单处理业务...
    }
}
  • OrderTimeoutNotEvaluated:订单超时未评价延迟队列处理类
package com.example.redissionstudy.handler.impl;

import com.example.redissionstudy.enums.RedisDelayQueueEnum;
import com.example.redissionstudy.handler.RedisDelayQueueHandle;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @author LuoJia
 * @version 1.0
 * @description: 订单超时未评价处理类
 * @date 2023/11/3 10:01
 */
@Component
@Slf4j
public class OrderTimeoutNotEvaluated implements RedisDelayQueueHandle<Map> {
    @Override
    public void execute(Map map) {
        log.info("{} {}", RedisDelayQueueEnum.ORDER_TIMEOUT_NOT_EVALUATED.getName(), map);
        // TODO 订单超时未评价,系统默认好评处理业务...
    }
}

2.8 创建延迟队列消费线程,项目启动完成后开启

package listener;

import com.example.redissionstudy.enums.RedisDelayQueueEnum;
import com.example.redissionstudy.handler.RedisDelayQueueHandle;
import com.example.redissionstudy.utils.RedisDelayQueueUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

/**
 * @author LuoJia
 * @version 1.0
 * @description: 启动延迟队列
 * @date 2023/11/3 10:02
 */
@Slf4j
@Component
public class RedisDelayQueueRunner implements CommandLineRunner {
    @Autowired
    private RedisDelayQueueUtil redisDelayQueueUtil;
    @Autowired
    private ApplicationContext applicationContext;
    @Override
    public void run(String... args) {
        new Thread(() -> {
            while (true) {
                try {
                    RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
                    for (RedisDelayQueueEnum queueEnum : queueEnums) {
                        Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
                        if (value != null) {
                            RedisDelayQueueHandle redisDelayQueueHandle = (RedisDelayQueueHandle) applicationContext.getBean(queueEnum.getBeanId());
                            redisDelayQueueHandle.execute(value);
                        }
                    }
                } catch (InterruptedException e) {
                    log.error("(Redis延迟队列异常中断) {}", e.getMessage());
                }
            }
        }).start();
        log.info("(Redis延迟队列启动成功)");
    }
}

以上步骤,Redis 延迟队列核心代码已经完成,下面我们写一个测试接口,用 PostMan 模拟测试一下

2.9 创建一个测试接口,模拟添加延迟队列

package com.example.redissionstudy.controller;

/**
 * @author LuoJia
 * @version 1.0
 * @description: 延迟队列测试
 * @date 2023/11/3 10:05
 */

import com.example.redissionstudy.enums.RedisDelayQueueEnum;
import com.example.redissionstudy.utils.RedisDelayQueueUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@RestController
public class RedisDelayQueueController {

    @Autowired
    private RedisDelayQueueUtil redisDelayQueueUtil;

    @GetMapping("/addQueue")
    public void addQueue() {
        Map<String, String> map1 = new HashMap<>();
        map1.put("orderId", "100");
        map1.put("remark", "其他信息");

        Map<String, String> map2 = new HashMap<>();
        map2.put("orderId", "200");
        map2.put("remark", "其他信息");

        // 添加订单支付超时,自动取消订单延迟队列。为了测试效果,延迟10秒钟
        redisDelayQueueUtil.addDelayQueue(map1, 10, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode());

        // 订单超时未评价,系统默认好评。为了测试效果,延迟20秒钟
        redisDelayQueueUtil.addDelayQueue(map2, 20, TimeUnit.SECONDS, RedisDelayQueueEnum.ORDER_TIMEOUT_NOT_EVALUATED.getCode());
    }
}
  • 运行结果
2023-11-03T10:09:46.800+08:00  INFO 21480 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2023-11-03T10:09:46.816+08:00  INFO 21480 --- [           main] c.e.r.RedissionStudyApplication          : Started RedissionStudyApplication in 4.888 seconds (process running for 5.743)
2023-11-03T10:09:46.825+08:00  INFO 21480 --- [           main] c.e.r.listener.RedisDelayQueueRunner     : (Redis延迟队列启动成功)
2023-11-03T10:09:47.039+08:00  INFO 21480 --- [-10.108.155.252] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2023-11-03T10:09:47.040+08:00  INFO 21480 --- [-10.108.155.252] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2023-11-03T10:09:47.042+08:00  INFO 21480 --- [-10.108.155.252] o.s.web.servlet.DispatcherServlet        : Completed initialization in 2 ms
2023-11-03T10:10:25.798+08:00  INFO 21480 --- [nio-8080-exec-4] c.e.r.utils.RedisDelayQueueUtil          : (添加延时队列成功) 队列键:ORDER_PAYMENT_TIMEOUT,队列值:{orderId=100, remark=其他信息},延迟时间:10秒
2023-11-03T10:10:25.802+08:00  INFO 21480 --- [nio-8080-exec-4] c.e.r.utils.RedisDelayQueueUtil          : (添加延时队列成功) 队列键:ORDER_TIMEOUT_NOT_EVALUATED,队列值:{orderId=200, remark=其他信息},延迟时间:20秒
2023-11-03T10:10:35.779+08:00  INFO 21480 --- [       Thread-2] c.e.r.handler.impl.OrderPaymentTimeout   : 订单支付超时,自动取消订单 {orderId=100, remark=其他信息}
2023-11-03T10:10:45.860+08:00  INFO 21480 --- [       Thread-2] c.e.r.h.impl.OrderTimeoutNotEvaluated    : 订单超时未评价,系统默认好评 {orderId=200, remark=其他信息}

参考资料

  1. SpringBoot集成Redisson实现延迟队列 - 掘金 (juejin.cn)
  2. SpringBoot集成Redisson实现延迟队列_redssion延时队列订阅_刘鹏博.的博客-CSDN博客
  3. Maven Repository: org.redisson » redisson-spring-boot-starter (mvnrepository.com)
  4. 【进阶篇】Redis实战之Redisson使用技巧详解 - 知乎 (zhihu.com)
  5. Table of Content · redisson/redisson Wiki · GitHub
  6. 浅析 Redisson 的分布式延时队列 RedissonDelayedQueue 运行流程 - 掘金 (juejin.cn)
  7. Redisson分布式延时队列 RedissonDelayedQueue - 掘金 (juejin.cn)
0

评论 (0)

打卡
取消