当前位置: 首页 > article >正文

详解多线程与Spring事务

批量数据操作优化

在实际项目中,在排查单线程、数据库等细节问题后,接口速度依旧很慢,于是决定使用多线程进行优化,同时使用到多线程事务,多线程进行任务拆分能提高效率,但需要考虑的因素也随之增多,如事务、线程安全性、实际提升效率等,因此应结合业务场景,优先考虑单线程优化,再考虑多线程优化。

@Transactional 注解

我们最常用的方法通过在public方法上添加@Transactional 注解来实现事务管理。大多数情况下,把事务的启动、提交或者回滚全部交给Spring框架操作非常便捷,但有一种场景,如果有一个大批量数据需要操作,为提高处理效率,通常对数据进行分组,多线程处理,这个注解对事务就不起作用了。

Spring事务管理

Spring的确可负责事务管理的所有底层实现细节,而且不管你用的是什么持久层框架,如Hibernate、MyBatis,即便是JDBC也都提供了统一的事务模型,确保数据访问方式的变更不会影响到代码实现层面。事务管理的良好封装,提升了开发效率,但在一个事务里面是否可以支持多个线程同时进行数据写入?

Spring实现事务通过ThreadLocal把事务和当前线程进行了绑定。ThreadLocal作为本地线程变量载体,保存了当前线程的变量,并确保所有变量是线程安全的。这些封闭隔离的变量中就包含了数据库连接,Session管理的对象以及当前事务运行的其他必要信息,而开启的新线程是获取不到这些变量和对象的。事务内部冒然启用多线程,受限于业务场景,大多数情况下是不会有问题的,但是不能忽视潜在的风险。问题主要集中在两个方面:一方面导致事务失效,看似是提高了处理效率,但是一旦有异常相关数据将不会回滚,就会破坏业务的完整性。另一方面还会增加死锁的概率,无计划的并发处理,增加资源争抢的概率,其后果就是死锁,产生的异常进一步破坏业务的完整性。

虽然不能通过事务内,发起多线程处理。我们可以通过合理分块后,再启用多线程处理,通过类似分布式事务方式实现果。

假设我们要并行处理一个大的对象列表,然后将它们存储到数据库中。我们先将这些对象分组,将每个块传递给不同线程分别去调用加了事务的处理方法,最后将每个线程中处理的结果收集汇总。这样通过事务的传播机制既确保了业务的完整性,也通过并行处理提升了处理效率。

实现方案一

参考分布式事务的二阶段提交,由两个CountDownLatch来实现主线程、子线程的监控和全部提交

二阶段提交
1.当所有事务完成操作后,进入准备提交阶段,会向事务管理器发送事务准备成功信号。
2.事务管理器接受到所有成功信号后,通知所有事务进行提交 or 事务管理器接收到某事务发生异常,通知所有事务进行回滚操作。



import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.*;
import java.io.Serializable;
import java.math.BigDecimal;

@Data
@Entity
@Table(name = "stu")
@AllArgsConstructor
@NoArgsConstructor
public class Stu implements Serializable {

    private static final long serialVersionUID = 1L;

    @Id
    @Column(name = "id", nullable = false)
    private Integer id;

    @Column(name = "name", nullable = false)
    private String name;

    @Column(name = "price")
    private BigDecimal price;

    public Stu(String name, BigDecimal price) {
        this.name = name;
        this.price = price;
    }
}

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@NoArgsConstructor
@AllArgsConstructor
@Data
public class RollBack {
    private boolean isRollback;
}

import com.example.demo.threadTransactional.entity.RollBack;
import com.example.demo.threadTransactional.repository.stuRepository;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import javax.transaction.SystemException;
import java.util.List;
import java.util.concurrent.*;


@Service
@Slf4j
public class StuService {

    @Resource
    private stuRepository stuRepository;

    @Resource
    @Lazy
    private StuService stuService;

    public void saveBook(List<stu> bookList) {

        // 根据业务场景,将大任务进行划分成小任务
        List<List<stu>> partition = Lists.partition(bookList, 3);

        // 子线程流程控制器
        CountDownLatch mainCountDownLatch = new CountDownLatch(1);

        // 回滚标记
        RollBack rollBack = new RollBack();

        // 主流程控制器
        CountDownLatch threadCountDownLatch = new CountDownLatch(partition.size());

        // 子线程预备执行结果
        BlockingDeque<Boolean> resultList = new LinkedBlockingDeque<>(partition.size());

        // 开启线程池
        int cpuNum = Runtime.getRuntime().availableProcessors() * 2;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("multi-thread-%d").build();
        ExecutorService fixedThreadPool =
                new ThreadPoolExecutor(cpuNum, cpuNum, 0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>(), threadFactory);

        // 创建线程任务并获取结果
        for (List<stu> list : partition) {
            fixedThreadPool.submit(new MyTask(list, mainCountDownLatch,
                    threadCountDownLatch, rollBack, resultList, bookService));
        }
        if (resultList.stream().anyMatch(result->!result)){
            rollBack.setRollback(true);
        }

        try {
            boolean await = threadCountDownLatch.await(10, TimeUnit.SECONDS);
            if (!await){
                // 指定时间子线程任务未执行完成,所有事务回滚
                log.error("子线程执行超时,所有事务回滚");
                rollBack.setRollback(true);
            }
        } catch (InterruptedException e) {
            // 发生异常,数据回滚
            rollBack.setRollback(true);
            throw new RuntimeException(e);
        }

        // 主线程监控事务完成
        mainCountDownLatch.countDown();

    }

    class MyTask implements Runnable{

        private final BookService bookService;

        private final List<stu> bookList;

        private final CountDownLatch mainCountDownLatch;

        private final CountDownLatch threadCountDownLatch;

        private final RollBack rollBack;

        BlockingDeque<Boolean> resultList;

        public MyTask(List<stu> bookList,
                      CountDownLatch mainCountDownLatch,
                      CountDownLatch threadCountDownLatch,
                      RollBack rollBack,
                      BlockingDeque<Boolean> resultList,
                      BookService bookService) {
            this.bookList = bookList;
            this.mainCountDownLatch = mainCountDownLatch;
            this.threadCountDownLatch = threadCountDownLatch;
            this.rollBack = rollBack;
            this.resultList = resultList;
            this.bookService = bookService;
        }

        public void run() {
            try {
                bookService.saveAll(bookList, mainCountDownLatch, threadCountDownLatch, rollBack, resultList);
            } catch (Exception e) {
                log.error(e.getMessage());
                throw new RuntimeException(e);
            }
        }
    }

    @Transactional(rollbackFor = Exception.class)
    public void saveAll(List<stu> bookList,
                            CountDownLatch mainCountDownLatch,
                            CountDownLatch threadCountDownLatch,
                            RollBack rollBack,
                            BlockingDeque<Boolean> threadDeque) throws Exception {
        try {
            for (stu book : bookList) {
                stuRepository.save(book);

                if ("book5".equals(book.getName())){
                    // 模拟某线程发生异常
                    int i = 1/0;
                }
            }
            threadDeque.add(true);
        } catch (Exception e) {
            log.error(String.format("线程%s发生异常,所有事务将回滚",Thread.currentThread().getName()));
            e.printStackTrace();
            rollBack.setRollback(true);
        }
        // 子线程准备提交事务
        threadCountDownLatch.countDown();

        // 等待主线程监听完所有事务处理情况
        mainCountDownLatch.await();

        // 子线程根据标记确定是否回滚
        if (rollBack.isRollback()){
            throw new SystemException(Thread.currentThread().getName() + " RollBack");
        }
    }
}

@RestController
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class TestController {

private final StuService stuService;

@PostMapping("/test")
public void test() {

    List<Stu> tuss = new ArrayList<>();

    for (int i = 0; i < 10; i++){
        tuss .add(new Stu("stu"+i, BigDecimal.valueOf(i)));
    }
    stuService.saveBook(books);
}

}


http://www.kler.cn/a/6618.html

相关文章:

  • 在Java虚拟机(JVM)中,方法可以分为虚方法和非虚方法。
  • 音视频入门基础:MPEG2-TS专题(21)——FFmpeg源码中,获取TS流的视频信息的实现
  • Clickhouse(Centos)
  • ubuntu,自动休眠后,程序自动暂停。如何破?
  • FingerprintJS的使用
  • 青少年编程与数学 02-004 Go语言Web编程 02课题、依赖管理
  • PVE虚拟机安装爱快/iKuai软路由(爱快软路由虚拟机系统安装教程)
  • npm ci 和 npm i 的区别
  • k8s常用软件包下载
  • RecycleView小结
  • php+微信小程序 websocket
  • C/C++每日一练(20230403)
  • 【数据库管理】⑥日志挖掘LogMiner
  • 图像镶嵌拼接
  • ToBeWritten之MIPS汇编基础铺垫
  • aspnet030高校学生团体管理系统sqlserver
  • 夜天之书 #80 推特开源算法与开放革命
  • RocketMQ消息ACK机制及消费进度管理
  • Linux——控制启动过程(更改root密码)
  • springcloud整合knike4j聚合微服务接口文档
  • 蓝桥杯 路径
  • 2.11 循环赛日程表
  • 编译与链接相关知识
  • 推荐一款强大的OCR工具
  • Golang电脑上怎么下载-Go安装和环境配置图文教程[超详细]
  • 联想服务器上安装 ffmpeg