包含组件内容
- RedisQueue:消息队列监听标识
- RedisQueueInit:Redis队列监听器
- RedisQueueListener:Redis消息队列监听实现
- RedisQueueService:Redis消息队列服务工具
代码实现
RedisQueue
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisQueue {
String value();
}
RedisQueueInit
import jakarta.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisQueueInit implements ApplicationContextAware {
public static final String REDIS_QUEUE_PREFIX = "redis-queue";
final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
@Resource
private RedissonClient redissonClient;
private ExecutorService executorService;
public static String buildQueueName(String queueName) {
return REDIS_QUEUE_PREFIX + ":" + queueName;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, RedisQueueListener> queueListeners = applicationContext.getBeansOfType(RedisQueueListener.class);
if (!queueListeners.isEmpty()) {
executorService = createThreadPool();
for (Map.Entry<String, RedisQueueListener> entry : queueListeners.entrySet()) {
RedisQueue redisQueue = entry.getValue().getClass().getAnnotation(RedisQueue.class);
if (redisQueue != null) {
String queueName = redisQueue.value();
executorService.submit(() -> listenQueue(queueName, entry.getValue()));
}
}
}
}
private ExecutorService createThreadPool() {
return new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 4,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new NamedThreadFactory(REDIS_QUEUE_PREFIX),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
private void listenQueue(String queueName, RedisQueueListener redisQueueListener) {
queueName = buildQueueName(queueName);
RBlockingQueue<?> blockingQueue = redissonClient.getBlockingQueue(queueName);
log.info("Redis队列监听开启: {}", queueName);
while (!shutdownRequested.get() && !redissonClient.isShutdown()) {
try {
Object message = blockingQueue.take();
executorService.submit(() -> redisQueueListener.consume(message));
} catch (RedissonShutdownException e) {
log.info("Redis连接关闭,停止监听队列: {}", queueName);
break;
} catch (Exception e) {
log.error("监听队列异常: {}", queueName, e);
}
}
}
public void shutdown() {
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException ex) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
shutdownRequested.set(true);
if (redissonClient != null && !redissonClient.isShuttingDown()) {
redissonClient.shutdown();
}
}
private static class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public NamedThreadFactory(String prefix) {
this.namePrefix = prefix;
}
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
}
}
}
RedisQueueListener
public interface RedisQueueListener<T> {
void consume(T content);
}
RedisQueueService
import jakarta.annotation.Resource;
import java.util.concurrent.TimeUnit;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
@Component
public class RedisQueueService {
@Resource
private RedissonClient redissonClient;
public <T> void send(String queueName, T content) {
RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));
blockingQueue.add(content);
}
public <T> void sendDelay(String queueName, T content, long delay, TimeUnit timeUnit) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(content, delay, timeUnit);
}
public <T> void sendDelay(String queueName, T content, long delay) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(RedisQueueInit.buildQueueName(queueName));
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(content, delay, TimeUnit.MILLISECONDS);
}
}
测试
创建监听对象
import cn.yiyanc.infrastructure.redis.annotation.RedisQueue;
import cn.yiyanc.infrastructure.redis.queue.RedisQueueListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RedisQueue("test")
public class TestListener implements RedisQueueListener<String> {
@Override
public void invoke(String content) {
log.info("队列消息接收 >>> {}", content);
}
}
测试用例
import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("queue")
public class QueueController {
@Resource
private RedisQueueService redisQueueService;
@PostMapping("send")
public void send(String message) {
redisQueueService.send("test", message);
redisQueueService.sendDelay("test", "delay messaege -> " + message, 1000);
}
}
测试结果