当前位置: 首页 > article >正文

java 中多线程、 队列使用实例,处理大数据业务

场景: 从redis 订阅数据 调用线程来异步处理数据

直接上代码

定义线程管理类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.concurrent.*;

/**
 * Created with IntelliJ IDEA.
 * @Description 线程池管理类
 */
@Component
public class ThreadPoolManager implements BeanFactoryAware {
    private static Logger logger = LoggerFactory.getLogger(ThreadPoolManager.class);
    //用于从IOC里取对象
    private BeanFactory factory; //如果实现Runnable的类是通过spring的application.xml文件进行注入,可通过 factory.getBean()获取,这里只是提一下
    // 线程池维护线程的最少数量 (根据环境而定)
    private final static int CORE_POOL_SIZE = 10;
    // 线程池维护线程的最大数量 (根据环境而定)
    private final static int MAX_POOL_SIZE = 50;
    // 线程池维护线程所允许的空闲时间
    private final static int KEEP_ALIVE_TIME = 0;
    // 线程池所使用的缓冲队列大小 (此处队列设置 需要考虑处理数据的效率  内存的大小)
    private final static int WORK_QUEUE_SIZE = 99999;

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        factory = beanFactory;
    }

    // 消息队列
    public LinkedBlockingQueue<String> getMsgQueue() {
        return msgQueue;
    }


    LinkedBlockingQueue<String> msgQueue = new LinkedBlockingQueue<>();


    /**
     * 当线程池的容量满了,执行下面代码,将推送数据存入到缓冲队列
     */
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            String  temp = ((MsgHandleThread) r).getRecord();
            if (StringUtils.isEmpty(temp)) {
                msgQueue.offer(temp);
            }
        }
    };
    /**
     * 创建线程池
     */
    final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME
            , TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);


    /**
     * 将任务加入线程池---执行数据处理
     */
    public void addPushRecord(String  record) {
        MsgHandleThread subThread=new MsgHandleThread(record);
        threadPool.execute(subThread);
    }

    /**
     * 线程池的定时任务----> 称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。
     */
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);


    /**
     * 检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池
     */
    final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            //判断缓冲队列是否存在记录
            if (!msgQueue.isEmpty()) {
                //当线程池的队列容量少于WORK_QUEUE_SIZE,则开始把缓冲队列的订单 加入到 线程池
                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                    String record = msgQueue.poll();
                    MsgHandleThread subThread=new MsgHandleThread(record);
                    threadPool.execute(subThread);
                }
            }
        }
    }, 0, 1, TimeUnit.SECONDS);



    /**
     * 终止订单线程池+调度线程池
     */
    public void shutdown() {
        //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止
        scheduledFuture.cancel(false);
        scheduler.shutdown();
        threadPool.shutdown();
    }
}

任务处理类
/**
 * Created with IntelliJ IDEA.
 * @Description 订阅数据 处理
 */
@Component
@Scope("prototype")//spring 多例
public class MsgHandleThread implements Runnable {
    private Logger logger = LoggerFactory.getLogger(SubCheckDataThread.class);

    private IDataHandleService _service
    private String record;

    public SubCheckDataThread(String  _record) {
        this.record = _record;
    }
    public String getRecord() {
        return record;
    }
    @Override
    public void run() {
        try {
            if (StringUtils.isEmpty(this.record)) {
                return;
            }
            // 无法注入是采用此方法
               if (_service== null) {
                _service= ApplicationContextProvider.getBean(IDataHandleService .class);
            }
         	//TODO 具体业务
            logger.info("消费完成",record);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

调用
import com.yicheng.common.properties.SetProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;

/**
 * <p>
 * 订阅redis消息
 * </p>
 *
 * @Author: zhuYaqiang
 * @Date: 2024/06/12
 */
@Component
public class SubscribeCheckData {
    @Autowired
    private ThreadPoolManager threadPoolManager;

    /***
    * @Description:  查岗信息订阅---redis
    * @Param: [message]
    * @return: void
    * @Author: zhuYaqiang
    * @Date: 2024/06/12
    */
    public void receiveMessage(String message) {
        try {
                threadPoolManager.addPushRecord(message);
            } catch (Exception e) {
                e.printStackTrace();
            }
    }

}
redis 订阅消息后调用线程池处理数据
package com.yicheng.subscribeRedis;

import com.yicheng.common.properties.SetProperties;
import com.yicheng.subscribeRedis.alarm.SubscribeAlarmNoticeData;
import com.yicheng.subscribeRedis.check.SubscribeCheckData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

/**
 * @title RedisSubscribeCHeck
 * @description
 * @create 2024/6/12 19:30
 */
@Configuration
public class RedisMessageListener {

    @Autowired
    private SetProperties setProperties;

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerCheckAdapter, MessageListenerAdapter listenerAlarmNoticeAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        listenerCheckAdapter.afterPropertiesSet();
        listenerAlarmNoticeAdapter.afterPropertiesSet();
        //订阅了的通道
        // 订阅查岗数据
        container.addMessageListener(listenerCheckAdapter, new PatternTopic(setProperties.getRedisCheckSub().getSubChannel()));
        //这个container 可以添加多个 messageListener
        return container;
    }

    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     * 监听查岗消息
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerCheckAdapter(SubscribeCheckData receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     *   监听报警通知信息
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAlarmNoticeAdapter(SubscribeAlarmNoticeData receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

}

以上代码已在实际项目中使用,觉得有用的点赞收藏评论


http://www.kler.cn/a/517147.html

相关文章:

  • ubuntu调用图形化网络测试工具
  • 6. 马科维茨资产组合模型+政策意图AI金融智能体(DeepSeek-V3)增强方案(理论+Python实战)
  • 【Redis】事务
  • 高效沟通驱动LabVIEW项目成功
  • composer安装指定php版本, 忽略平台原因导致的报错
  • OSCP - Proving Grounds - Quackerjack
  • 【Linux网络编程】传输层协议
  • Spring Boot 快速创建项目
  • Swing使用MVC模型架构
  • 日志收集Day005
  • 数据结构(一)顺序表和链表
  • 【前端】如何依靠纯前端实现拍照获取/选择文件等文字识别OCR技术
  • 【HarmonyOS NAPI 深度探索10】HarmonyOS Next 中的 NAPI 的架构与原理
  • U3D的.Net学习
  • 阿里云服务器在Ubuntu上安装redis并使用
  • Java 生成 PDF 文档 如此简单
  • OpenAI秘密重塑机器人军团: 实体AGI的崛起!
  • ngrok同时配置多个内网穿透方法
  • 航空航天混合动力(7)航空航天分布式电推进系统
  • ChopChopGo:一款针对Linux的取证数据快速收集工具
  • 【Unity】ScrollViewContent适配问题(Contentsizefilter不刷新、ContentSizeFilter失效问题)
  • 提升效率与体验,让笔记更智能
  • Java导出通过Word模板导出docx文件并通过QQ邮箱发送
  • 深入探索imi框架:PHP Swoole的高性能协程应用实践
  • 深入解析:Docker 容器如何实现文件系统与资源的多维隔离?
  • Go 不可重复协程安全队列