第2课redis_消息机制_延迟队列
热度🔥:37 免费课程
授课语音
Redis消息队列
1. 介绍
Redis 的消息机制主要包括以下几种方式:
发布订阅 (Pub/Sub):这是一种消息通信模式,发布者将消息发送到特定的频道,所有订阅该频道的用户都会接收到这些消息。它不支持持久化,适合实时消息推送和事件通知系统等场景。
消息队列:这是基于 Redis 列表实现的,支持生产者-消费者模式,但不支持持久化。适合单个生产者和消费者场景。
Stream:这是一个高效处理数据流的机制,支持持久化、消息确认等功能,支持多个消费者分组处理消息,从而实现负载均衡。适合处理高吞吐量的消息传递、日志记录等场景。在实际开发中,如果使用 Redis 的消息机制,推荐使用 Stream。
Redis 延迟队列:这是一种任务调度机制,可以用于在特定时间后执行任务(例如红包过期、未支付订单过期等业务场景)。它内部使用 Redis 的有序集合 (
sorted set
) 实现,通常建议使用 Redisson 来简化 Redis 的延迟队列使用。
2. 代码案例
下面的案例是用 Java 的 POM 项目实现的 Stream 消息机制和延迟队列(依赖 Redisson)。我们可以实际运行这些代码来查看效果。
Maven 依赖
首先,在 pom.xml
中添加以下依赖:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.6</version>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.18.0</version>
</dependency>
消息生产者
接下来是消息生产者的实现代码:
// 消息的生产者,可以从控制台输入消息
package com.zhilitech.messagequeue;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import java.util.Map;
import java.util.Scanner;
public class Producer {
private static final String STREAM_NAME = "mystream"; // 定义流的名称
public static void main(String[] args) {
// 使用 Jedis 连接到 Redis 服务器
try (Jedis jedis = new Jedis("localhost", 6379)) {
Scanner scanner = new Scanner(System.in);
System.out.println("请输入要发送到流中的消息。输入 'exit' 退出。");
while (true) {
String message = scanner.nextLine(); // 从控制台读取输入
if ("exit".equalsIgnoreCase(message)) {
break; // 如果输入 'exit',则退出循环
}
// 将消息添加到流中
jedis.xadd(STREAM_NAME, StreamEntryID.NEW_ENTRY, Map.of("message", message));
System.out.println("消息已添加到流中: " + message);
}
}
}
}
消息消费者
下面是消息消费者的实现代码:
// 消息的消费者
package com.zhilitech.messagequeue;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.resps.StreamEntry;
import java.util.List;
import java.util.Map;
public class Consumer {
private static JedisPool jedisPool; // Redis 连接池
private static void setupJedis() {
// 配置连接池
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(10); // 设置最大连接数
poolConfig.setMinIdle(2); // 设置最小空闲连接数
poolConfig.setMaxIdle(5); // 设置最大空闲连接数
poolConfig.setTestOnBorrow(true); // 借用连接前进行验证
String redisHost = "localhost"; // Redis 主机
int redisPort = 6379; // Redis 端口
String redisPassword = null; // Redis 密码
int redisDb = 0; // Redis 数据库索引
jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword, redisDb); // 创建连接池
}
private static final String STREAM_NAME = "mystream"; // 定义流的名称
private static final String GROUP_NAME = "mygroup"; // 定义消费组名称
private static final String CONSUMER_NAME = "consumer1"; // 定义消费者名称
public static void main(String[] args) {
setupJedis(); // 初始化 Redis 连接池
try (Jedis jedis = jedisPool.getResource()) {
System.out.println("消费者已启动。等待消息...");
// 创建消费者组
try {
jedis.xgroupCreate(STREAM_NAME, GROUP_NAME, StreamEntryID.LAST_ENTRY, true);
} catch (Exception e) {
System.out.println("组已存在");
}
while (true) {
// 获取消费组的消息
List<Map.Entry<String, List<StreamEntry>>> entries = jedis.xreadGroup(GROUP_NAME, CONSUMER_NAME,
XReadGroupParams.xReadGroupParams().count(1).block(0), Map.of(STREAM_NAME, StreamEntryID.UNRECEIVED_ENTRY));
// 处理每条消息
for (Map.Entry<String, List<StreamEntry>> entry : entries) {
for (StreamEntry streamEntry : entry.getValue()) {
System.out.println("已消费: " + streamEntry.getFields().get("message")); // 打印消费的消息
// 确认消息
jedis.xack(STREAM_NAME, GROUP_NAME, streamEntry.getID());
}
}
// 如果没有消息,稍作等待
if (entries.isEmpty()) {
try {
Thread.sleep(1000); // 休眠 1 秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
}
}
}
}
延迟队列实现
最后是延迟队列的实现代码:
// 延迟队列示例
package com.zhilitech.delaytask;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.config.Config;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class DelayedTaskExample {
// Redisson 配置
private static final String REDIS_URL = "redis://127.0.0.1:6379";
// 订单过期时间(秒)
private static final long ORDER_EXPIRATION_TIME = 10;
// 不同红包类型的过期时间(分钟)
private static final long RED_PACKET_TYPE_A_EXPIRATION_TIME = 1;
private static final long RED_PACKET_TYPE_B_EXPIRATION_TIME = 2;
private static final long RED_PACKET_TYPE_C_EXPIRATION_TIME = 3;
// 创建线程池
private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
public static void main(String[] args) throws InterruptedException {
// 创建 Redisson 客户端
Config config = new Config();
config.useSingleServer().setAddress(REDIS_URL);
RedissonClient redissonClient = Redisson.create(config);
// 处理订单过期任务
handleOrderExpiration(redissonClient);
// 处理不同类型红包的过期任务
handleRedPacketExpiration(redissonClient, "typeA", RED_PACKET_TYPE_A_EXPIRATION_TIME);
handleRedPacketExpiration(redissonClient, "typeB", RED_PACKET_TYPE_B_EXPIRATION_TIME);
handleRedPacketExpiration(redissonClient, "typeC", RED_PACKET_TYPE_C_EXPIRATION_TIME);
// 保持主线程运行,以便观察任务处理结果
Thread.sleep(15 * 60 * 1000); // 15 分钟
redissonClient.shutdown(); // 关闭 Redisson 客户端
executorService.shutdown(); // 关闭线程池
}
private static void handleOrderExpiration(RedissonClient redissonClient) {
// 创建订单队列
RBlockingQueue<String> queue = redissonClient.getBlockingQueue("orderQueue");
// 创建一个延迟队列,用于处理订单过期
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);
// 添加订单到延迟队列中,10 秒后过期
String orderId = "order12345";
delayedQueue.offer(orderId, ORDER_EXPIRATION_TIME, TimeUnit.SECONDS);
// 使用线程池处理订单过期任务
executorService.submit(() -> {
while (true) {
try {
// 阻塞式获取队列中的任务
String expiredOrderId = queue.take(); // 这会阻塞直到有任务到达
if (expiredOrderId != null) {
// 订单过期处理逻辑
System.out
.println("订单 " + expiredOrderId + " 已过期!");
// 可以在这里添加更多的过期处理逻辑,例如通知系统管理员或更新数据库状态等
}
} catch (InterruptedException e) {
// 处理中断异常
System.err.println("处理订单过期任务的线程被中断!");
Thread.currentThread().interrupt(); // 保持中断状态
} catch (Exception e) {
// 处理其他异常
e.printStackTrace();
}
}
});
}
private static void handleRedPacketExpiration(RedissonClient redissonClient, String type, long expirationTime) {
// 创建红包队列
RBlockingQueue<String> queue = redissonClient.getBlockingQueue("redPacketQueue:" + type);
// 创建一个延迟队列,用于处理红包过期
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);
// 添加不同类型的红包到延迟队列中
for (int i = 1; i <= 5; i++) {
String redPacketId = type + "_redPacket_" + i;
delayedQueue.offer(redPacketId, expirationTime, TimeUnit.MINUTES);
}
// 使用线程池处理红包过期任务
executorService.submit(() -> {
while (true) {
try {
// 阻塞式获取队列中的任务
String expiredRedPacketId = queue.take(); // 这会阻塞直到有任务到达
if (expiredRedPacketId != null) {
// 红包过期处理逻辑
System.out.println("红包 " + expiredRedPacketId + " 已过期!");
// 可以在这里添加更多的过期处理逻辑,例如通知用户或更新数据库状态等
}
} catch (InterruptedException e) {
// 处理中断异常
System.err.println("处理红包过期任务的线程被中断!");
Thread.currentThread().interrupt(); // 保持中断状态
} catch (Exception e) {
// 处理其他异常
e.printStackTrace();
}
}
});
}
}
通过以上的代码示例,您可以看到如何使用 Redis 的消息机制和延迟队列来处理实时消息和定时任务。这些实现为应用程序提供了强大的消息传递和任务调度功能,适用于多种场景。