授课语音

Redis消息队列

1. 介绍

Redis 的消息机制主要包括以下几种方式:

  • 发布订阅 (Pub/Sub):这是一种消息通信模式,发布者将消息发送到特定的频道,所有订阅该频道的用户都会接收到这些消息。它不支持持久化,适合实时消息推送和事件通知系统等场景。

  • 消息队列:这是基于 Redis 列表实现的,支持生产者-消费者模式,但不支持持久化。适合单个生产者和消费者场景。

  • Stream:这是一个高效处理数据流的机制,支持持久化、消息确认等功能,支持多个消费者分组处理消息,从而实现负载均衡。适合处理高吞吐量的消息传递、日志记录等场景。在实际开发中,如果使用 Redis 的消息机制,推荐使用 Stream

  • Redis 延迟队列:这是一种任务调度机制,可以用于在特定时间后执行任务(例如红包过期、未支付订单过期等业务场景)。它内部使用 Redis 的有序集合 (sorted set) 实现,通常建议使用 Redisson 来简化 Redis 的延迟队列使用。

2. 代码案例

下面的案例是用 Java 的 POM 项目实现的 Stream 消息机制和延迟队列(依赖 Redisson)。我们可以实际运行这些代码来查看效果。

Maven 依赖

首先,在 pom.xml 中添加以下依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.4.6</version>
</dependency>

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.18.0</version>
</dependency>

消息生产者

接下来是消息生产者的实现代码:

// 消息的生产者,可以从控制台输入消息
package com.zhilitech.messagequeue;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;

import java.util.Map;
import java.util.Scanner;

public class Producer {
    private static final String STREAM_NAME = "mystream"; // 定义流的名称

    public static void main(String[] args) {
        // 使用 Jedis 连接到 Redis 服务器
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入要发送到流中的消息。输入 'exit' 退出。");

            while (true) {
                String message = scanner.nextLine(); // 从控制台读取输入
                if ("exit".equalsIgnoreCase(message)) {
                    break; // 如果输入 'exit',则退出循环
                }
                // 将消息添加到流中
                jedis.xadd(STREAM_NAME, StreamEntryID.NEW_ENTRY, Map.of("message", message));
                System.out.println("消息已添加到流中: " + message);
            }
        }
    }
}

消息消费者

下面是消息消费者的实现代码:

// 消息的消费者
package com.zhilitech.messagequeue;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.resps.StreamEntry;

import java.util.List;
import java.util.Map;

public class Consumer {
    private static JedisPool jedisPool; // Redis 连接池

    private static void setupJedis() {
        // 配置连接池
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(10); // 设置最大连接数
        poolConfig.setMinIdle(2);   // 设置最小空闲连接数
        poolConfig.setMaxIdle(5);   // 设置最大空闲连接数
        poolConfig.setTestOnBorrow(true); // 借用连接前进行验证
        String redisHost = "localhost"; // Redis 主机
        int redisPort = 6379; // Redis 端口
        String redisPassword = null; // Redis 密码
        int redisDb = 0; // Redis 数据库索引
        jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword, redisDb); // 创建连接池
    }

    private static final String STREAM_NAME = "mystream"; // 定义流的名称
    private static final String GROUP_NAME = "mygroup"; // 定义消费组名称
    private static final String CONSUMER_NAME = "consumer1"; // 定义消费者名称

    public static void main(String[] args) {
        setupJedis(); // 初始化 Redis 连接池
        try (Jedis jedis = jedisPool.getResource()) {
            System.out.println("消费者已启动。等待消息...");

            // 创建消费者组
            try {
                jedis.xgroupCreate(STREAM_NAME, GROUP_NAME, StreamEntryID.LAST_ENTRY, true);
            } catch (Exception e) {
                System.out.println("组已存在");
            }

            while (true) {
                // 获取消费组的消息
                List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME,
                        XReadGroupParams.xReadGroupParams().count(1).block(0), Map.of(STREAM_NAME, StreamEntryID.UNRECEIVED_ENTRY));

                // 处理每条消息
                for (Map.Entry<String, List<StreamEntry>> entry : entries) {
                    for (StreamEntry streamEntry : entry.getValue()) {
                        System.out.println("已消费: " + streamEntry.getFields().get("message")); // 打印消费的消息
                        // 确认消息
                        jedis.xack(STREAM_NAME, GROUP_NAME, streamEntry.getID());
                    }
                }
                // 如果没有消息,稍作等待
                if (entries.isEmpty()) {
                    try {
                        Thread.sleep(1000); // 休眠 1 秒
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt(); // 恢复中断状态
                    }
                }
            }
        }
    }
}

延迟队列实现

最后是延迟队列的实现代码:

// 延迟队列示例
package com.zhilitech.delaytask;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.config.Config;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DelayedTaskExample {

    // Redisson 配置
    private static final String REDIS_URL = "redis://127.0.0.1:6379";

    // 订单过期时间(秒)
    private static final long ORDER_EXPIRATION_TIME = 10;

    // 不同红包类型的过期时间(分钟)
    private static final long RED_PACKET_TYPE_A_EXPIRATION_TIME = 1;
    private static final long RED_PACKET_TYPE_B_EXPIRATION_TIME = 2;
    private static final long RED_PACKET_TYPE_C_EXPIRATION_TIME = 3;

    // 创建线程池
    private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

    public static void main(String[] args) throws InterruptedException {
        // 创建 Redisson 客户端
        Config config = new Config();
        config.useSingleServer().setAddress(REDIS_URL);
        RedissonClient redissonClient = Redisson.create(config);

        // 处理订单过期任务
        handleOrderExpiration(redissonClient);

        // 处理不同类型红包的过期任务
        handleRedPacketExpiration(redissonClient, "typeA", RED_PACKET_TYPE_A_EXPIRATION_TIME);
        handleRedPacketExpiration(redissonClient, "typeB", RED_PACKET_TYPE_B_EXPIRATION_TIME);
        handleRedPacketExpiration(redissonClient, "typeC", RED_PACKET_TYPE_C_EXPIRATION_TIME);

        // 保持主线程运行,以便观察任务处理结果
        Thread.sleep(15 * 60 * 1000); // 15 分钟
        redissonClient.shutdown(); // 关闭 Redisson 客户端
        executorService.shutdown(); // 关闭线程池
    }

    private static void handleOrderExpiration(RedissonClient redissonClient) {
        // 创建订单队列
        RBlockingQueue<String> queue = redissonClient.getBlockingQueue("orderQueue");

        // 创建一个延迟队列,用于处理订单过期
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);

        // 添加订单到延迟队列中,10 秒后过期
        String orderId = "order12345";
        delayedQueue.offer(orderId, ORDER_EXPIRATION_TIME, TimeUnit.SECONDS);

        // 使用线程池处理订单过期任务
        executorService.submit(() -> {
            while (true) {
                try {
                    // 阻塞式获取队列中的任务
                    String expiredOrderId = queue.take(); // 这会阻塞直到有任务到达
                    if (expiredOrderId != null) {
                        // 订单过期处理逻辑
                        System.out

.println("订单 " + expiredOrderId + " 已过期!");
                        // 可以在这里添加更多的过期处理逻辑,例如通知系统管理员或更新数据库状态等
                    }
                } catch (InterruptedException e) {
                    // 处理中断异常
                    System.err.println("处理订单过期任务的线程被中断!");
                    Thread.currentThread().interrupt(); // 保持中断状态
                } catch (Exception e) {
                    // 处理其他异常
                    e.printStackTrace();
                }
            }
        });
    }

    private static void handleRedPacketExpiration(RedissonClient redissonClient, String type, long expirationTime) {
        // 创建红包队列
        RBlockingQueue<String> queue = redissonClient.getBlockingQueue("redPacketQueue:" + type);

        // 创建一个延迟队列,用于处理红包过期
        RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);

        // 添加不同类型的红包到延迟队列中
        for (int i = 1; i <= 5; i++) {
            String redPacketId = type + "_redPacket_" + i;
            delayedQueue.offer(redPacketId, expirationTime, TimeUnit.MINUTES);
        }

        // 使用线程池处理红包过期任务
        executorService.submit(() -> {
            while (true) {
                try {
                    // 阻塞式获取队列中的任务
                    String expiredRedPacketId = queue.take(); // 这会阻塞直到有任务到达
                    if (expiredRedPacketId != null) {
                        // 红包过期处理逻辑
                        System.out.println("红包 " + expiredRedPacketId + " 已过期!");
                        // 可以在这里添加更多的过期处理逻辑,例如通知用户或更新数据库状态等
                    }
                } catch (InterruptedException e) {
                    // 处理中断异常
                    System.err.println("处理红包过期任务的线程被中断!");
                    Thread.currentThread().interrupt(); // 保持中断状态
                } catch (Exception e) {
                    // 处理其他异常
                    e.printStackTrace();
                }
            }
        });
    }
}

通过以上的代码示例,您可以看到如何使用 Redis 的消息机制和延迟队列来处理实时消息和定时任务。这些实现为应用程序提供了强大的消息传递和任务调度功能,适用于多种场景。

去1:1私密咨询

系列课程: