java 中多线程、 队列使用实例,处理大数据业务
场景: 从redis 订阅数据 调用线程来异步处理数据
直接上代码
定义线程管理类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.concurrent.*;
/**
* Created with IntelliJ IDEA.
* @Description 线程池管理类
*/
@Component
public class ThreadPoolManager implements BeanFactoryAware {
private static Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);
//用于从IOC里取对象
private BeanFactory factory; //如果实现Runnable的类是通过spring的application.xml文件进行注入,可通过 factory.getBean()获取,这里只是提一下
// 线程池维护线程的最少数量 (根据环境而定)
private final static int CORE_POOL_SIZE = 10;
// 线程池维护线程的最大数量 (根据环境而定)
private final static int MAX_POOL_SIZE = 50;
// 线程池维护线程所允许的空闲时间
private final static int KEEP_ALIVE_TIME = 0;
// 线程池所使用的缓冲队列大小 (此处队列设置 需要考虑处理数据的效率 内存的大小)
private final static int WORK_QUEUE_SIZE = 99999;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
factory = beanFactory;
}
// 消息队列
public LinkedBlockingQueue<String> getMsgQueue() {
return msgQueue;
}
LinkedBlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();
/**
* 当线程池的容量满了,执行下面代码,将推送数据存入到缓冲队列
*/
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
String temp = ((MsgHandleThread) r).getRecord();
if (StringUtils.isEmpty(temp)) {
msgQueue.offer(temp);
}
}
};
/**
* 创建线程池
*/
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME
, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);
/**
* 将任务加入线程池---执行数据处理
*/
public void addPushRecord(String record) {
MsgHandleThread subThread=new MsgHandleThread(record);
threadPool.execute(subThread);
}
/**
* 线程池的定时任务----> 称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。
*/
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
/**
* 检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池
*/
final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//判断缓冲队列是否存在记录
if (!msgQueue.isEmpty()) {
//当线程池的队列容量少于WORK_QUEUE_SIZE,则开始把缓冲队列的订单 加入到 线程池
if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
String record = msgQueue.poll();
MsgHandleThread subThread=new MsgHandleThread(record);
threadPool.execute(subThread);
}
}
}
}, 0, 1, TimeUnit.SECONDS);
/**
* 终止订单线程池+调度线程池
*/
public void shutdown() {
//true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止
scheduledFuture.cancel(false);
scheduler.shutdown();
threadPool.shutdown();
}
}
任务处理类
/**
* Created with IntelliJ IDEA.
* @Description 订阅数据 处理
*/
@Component
@Scope("prototype")//spring 多例
public class MsgHandleThread implements Runnable {
private Logger logger = LoggerFactory.getLogger(SubCheckDataThread.class);
private IDataHandleService _service
private String record;
public SubCheckDataThread(String _record) {
this.record = _record;
}
public String getRecord() {
return record;
}
@Override
public void run() {
try {
if (StringUtils.isEmpty(this.record)) {
return;
}
// 无法注入是采用此方法
if (_service== null) {
_service= ApplicationContextProvider.getBean(IDataHandleService .class);
}
//TODO 具体业务
logger.info("消费完成",record);
} catch (Exception e) {
e.printStackTrace();
}
}
}
调用
import com.yicheng.common.properties.SetProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
/**
* <p>
* 订阅redis消息
* </p>
*
* @Author: zhuYaqiang
* @Date: 2024/06/12
*/
@Component
public class SubscribeCheckData {
@Autowired
private ThreadPoolManager threadPoolManager;
/***
* @Description: 查岗信息订阅---redis
* @Param: [message]
* @return: void
* @Author: zhuYaqiang
* @Date: 2024/06/12
*/
public void receiveMessage(String message) {
try {
threadPoolManager.addPushRecord(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
redis 订阅消息后调用线程池处理数据
package com.yicheng.subscribeRedis;
import com.yicheng.common.properties.SetProperties;
import com.yicheng.subscribeRedis.alarm.SubscribeAlarmNoticeData;
import com.yicheng.subscribeRedis.check.SubscribeCheckData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/**
* @title RedisSubscribeCHeck
* @description
* @create 2024/6/12 19:30
*/
@Configuration
public class RedisMessageListener {
@Autowired
private SetProperties setProperties;
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerCheckAdapter, MessageListenerAdapter listenerAlarmNoticeAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
listenerCheckAdapter.afterPropertiesSet();
listenerAlarmNoticeAdapter.afterPropertiesSet();
//订阅了的通道
// 订阅查岗数据
container.addMessageListener(listenerCheckAdapter, new PatternTopic(setProperties.getRedisCheckSub().getSubChannel()));
//这个container 可以添加多个 messageListener
return container;
}
/**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
* 监听查岗消息
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter listenerCheckAdapter(SubscribeCheckData receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
/**
* 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
* 监听报警通知信息
* @param receiver
* @return
*/
@Bean
MessageListenerAdapter listenerAlarmNoticeAdapter(SubscribeAlarmNoticeData receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
以上代码已在实际项目中使用,觉得有用的点赞收藏评论