跳到主要内容

多线程事务控制与大量数据导入

大量数据导入问题

问题复现

我这里有一段代码,要将大量数据批量插入数据库

@Transactional(rollbackFor = Exception.class)
@Override
public void testAdd() {
List<Person> list = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
Person person = new Person();
person.setName("name" + i);
person.setPassword("password" + i);
person.setEmail("email" + i);
list.add(person);
}
baseMapper.save(list);
}

会报错:Error updating database. Cause: com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large (83,666,757 > 67,108,864). You can change this value on the server by setting the 'max_allowed_packet' variable.

image-20240901194929206

问题原因

这个错误信息是MySQL抛出的PacketTooBigException,表示要执行的SQL语句(或数据包)太大,超过了服务器配置的max_allowed_packet限制。默认大小4194304 也就是4M

解决办法

修改配置

办法一:

  • 修改 my.ini 加上 max_allowed_packet =67108864,也就是64MB,不过这里可以设置的更大一些,以实际情况为准,修改完成之后要重启mysql服务,如果通过命令行修改就不用重启mysql服务。

办法二:

  • 通过命令行:设置为500M,会直接生效,mysql> set global max_allowed_packet = 500*1024*1024;

分批插入

我们将总数据进行计算,分批次差人,例如每次插入100条,然后计算商和余数,再进行分配插入,最后把余数插入。

@Transactional(rollbackFor = Exception.class)
@Override
public void testAdd() {
List<Person> list = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
Person person = new Person();
person.setName("name" + i);
person.setPassword("password" + i);
person.setEmail("email" + i);
list.add(person);
}
//10000 100 100
int c = 100;
int b = list.size() / c;
int d = list.size() % c;
for (int e = c; e <= c * b; e = e + c) {
baseMapper.save(list.subList(e - c, e));
}
if (d != 0) {
baseMapper.save(list.subList(c * b, list.size()));
}
}

这样也不是特别好,速度慢容易超时。

多线程分批导入

多线程不可以缓解数据库压力!这里只是一个思路,为了快一点,并且这里还存在事务失效问题。

这里给一个思路,但是这种写法会有很大的问题,会在下面介绍。

@Transactional(rollbackFor = Exception.class)
@Override
public void testAdd() {
List<Person> list = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
Person person = new Person();
person.setName("name" + i);
person.setPassword("password" + i);
person.setEmail("email" + i);
list.add(person);
}
int threadNum = 20;
int size = list.size();
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
final List<Person> subList = list.subList(size / threadNum * i, size / threadNum * (i + 1));
executorService.execute(() -> {
try {
baseMapper.save(subList);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}

这里会有多线程的事务失效问题:

事务失效的原因是因为你使用了多线程执行数据库操作,而Spring的声明式事务管理是基于单线程的。Spring的事务是通过AOP实现的,它只能在当前线程中生效。而在多线程的情况下,每个线程都是独立的,因此事务管理无法跨线程生效。

多线程事务控制问题

编程式事务+CyclicBarrier

在上面给出的解决办法中,我们使用多线程来解决大量数据导入数据库的问题,但是这样会导致多线程的事务问题。

如果必须使用多线程,可以在每个线程中手动管理事务。比如,使用 TransactionTemplate 或直接获取 DataSourceTransactionManager,在每个线程中手动开始和提交事务。

@Override
public void testAdd() {
List<Person> list = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
Person person = new Person();
person.setName("name" + i);
person.setPassword("password" + i);
person.setEmail("email" + i);
list.add(person);
}


int nThreads = 20;
CyclicBarrier cyclicBarrier = new CyclicBarrier(nThreads);
AtomicReference<Boolean> rollback = new AtomicReference<>(false);
int size = list.size();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
// 20000 20 1000
for (int i = 0; i < nThreads; i++) {
final List<Person> itemImputList = list.subList(size / nThreads * i, size / nThreads * (i + 1));
executorService.execute(() -> {
TransactionStatus tranction = transactionalUtil.begin();
try {
// insert 操作 小于1 就抛异常
if (baseMapper.save(itemImputList) < 1) {
log.info("手动异常");
throw new RuntimeException("插入数据失败");
}
} catch (Exception e) {
// 如果当前线程异常 则设置回滚标志
rollback.set(true);
log.error("插入数据失败");
}
// 等待所有线程的事务结果
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
if (rollback.get()) {
transactionalUtil.rollback(tranction);
log.info("rollback");
return;
}
transactionalUtil.commit(tranction);
});
}
executorService.shutdown();
}

在上述代码中,我们创建了 20 个线程,使用 CyclicBarrier 来确保所有线程在执行到特定代码时进行同步。通过 AtomicReference 来记录是否有线程出现异常。如果有线程插入数据失败并抛出异常,会将原子引用设置为 true,在后续判断中进行事务回滚操作;如果所有线程都成功插入数据,则提交各自线程的事务。解决问题的思路:

  • 手动事务管理:每个线程在自己的上下文中手动管理事务,避免Spring声明式事务无法跨线程控制的限制。
  • 多线程同步:通过 CyclicBarrier 保证所有线程在同一个点同步,以确保所有线程的事务都统一提交或回滚。
  • 全局事务控制:使用 AtomicReference<Boolean> 实现线程间的安全通信,标记是否需要回滚,确保多线程操作时事务一致性。

一些代码说明:

CyclicBarrier 的作用是使多个线程在某个点进行同步,也就是让所有线程都达到一个“屏障”后,再一起继续执行。每个线程在调用 cyclicBarrier.await() 后会被阻塞,直到所有参与的线程都调用了 await()。应用场景:多个线程并发执行不同的任务,任务完成后需要进行统一处理,只有当所有线程都完成各自任务后,才允许进行下一步操作。

AtomicReference 主要用于多线程场景下,确保在不使用同步锁的情况下,实现对某个对象引用的安全更新和读取。它是通过硬件级的 CAS(Compare-And-Swap, 比较并交换) 操作实现的,避免了锁的开销。

  • get(): 获取当前的引用值。
  • set(): 设置新的引用值。
  • compareAndSet(): 比较当前值与期望值,如果相等则更新为新的值,原子操作。

虽然我们在这里使用多线程批量插入数据到 MySQL 中,并解决了多线程内部事务控制的问题,但是这种方式实际上是基于编程式事务配合二阶段提交(2PC)使用的,而 2PC 存在一些潜在的问题。例如,当协调者通知事务参与者提交事务时,如果某些事务参与者提交失败,就会出现数据不一致的情况。如果提交成功的事务数据被写入数据库,而提交失败的事务数据则没有被正确处理,导致数据的不一致性。同时,代码中的多线程使用也会存在问题:

  • 如果某个线程在执行过程中出现异常导致无法响应 CyclicBarrier 的等待,可能会导致整个程序的阻塞。
  • 如果大量子线程同时进入await状态,可能会导致线程池的资源暂时被占用,从而影响其他任务的执行。如果此时有新的任务提交到线程池,可能会因为线程资源不足而导致任务排队等待,甚至可能出现线程池 “炸了” 的情况,即任务积压过多,响应时间变长。

让子线程直接执行完后让主线程来提交事务是一种可行的思路。这样可以避免子线程的阻塞,提高线程池的利用率。但是,这需要确保主线程能够准确地获取子线程的执行结果,并根据这些结果来决定是否提交事务。

如果其中一个事务提交失败,需要考虑补偿机制来达到最终一致性。补偿机制的具体实现方式取决于具体的业务场景和需求。一种常见的做法是记录事务的执行状态和相关信息,以便在发现事务提交失败时能够进行重试或采取其他补偿措施。例如,可以将失败的事务放入一个重试队列中,稍后重新尝试提交。如果多次重试仍然失败,可以考虑采取人工干预或其他方式来解决问题。

多线程事务与分布式事务

从某种角度上讲,多线程事务可以理解为分布式事务的一种简化形式。

  • 在多线程环境中,每个线程可以看作是一个独立的事务参与者,它们之间需要进行协调和同步,以确保数据的一致性。

  • 而在分布式系统中,多个节点之间的事务处理更加复杂,需要考虑网络延迟、节点故障等因素。

解决分布式事务的思路

解决分布式事务最好的方法是尽量避免出现分布式事务。在实际应用中,大部分解决分布式事务的方案都是追求最终一致性。例如,可以采用消息队列来异步处理事务,或者使用分布式事务框架来协调事务的提交和回滚。

CountdownLatch闭锁

整体思路:

  1. 主线程使用 CountDownLatch 等待所有子线程执行完毕。
  2. 子线程完成插入操作后,主线程检查结果,如果有任何一个子线程执行失败,主线程将 IS_OK 设置为 false
  3. 子线程在完成插入并等待主线程通知后,检查 IS_OK 的值,如果为 true,则提交事务;如果为 false,则回滚事务。

注意要将IS_OK设置为volatile,保证多线程之间是可见的。

@Resource
private NoAutoTransactionalUtil transactionalUtil;

volatile Boolean IS_OK = true;

@Override
public void testAdd() {
List<Person> list = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
Person person = new Person();
person.setName("name" + i);
person.setPassword("password" + i);
person.setEmail("email" + i);
list.add(person);
}


int nThreads = 20;
CountDownLatch childMonitor = new CountDownLatch(nThreads);
// 主线程收集子线程的结果
List<Boolean> childResponse = Collections.synchronizedList(new ArrayList<>());
// 子线程在该对象上等待主线程的通知
CountDownLatch parentMonitor = new CountDownLatch(1);
IS_OK = true;

int size = list.size();
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
// 20000 20 1000
for (int i = 0; i < nThreads; i++) {
final List<Person> itemImputList = list.subList(size / nThreads * i, size / nThreads * (i + 1));
executorService.execute(() -> {
TransactionStatus tranction = transactionalUtil.begin();
try {
// insert 操作 小于1 就抛异常
if (baseMapper.save(itemImputList) < 1) {
log.info("手动异常");
throw new RuntimeException("插入数据失败");
}
childResponse.add(Boolean.TRUE);
childMonitor.countDown();
log.info("线程{}正常执行成功,等待其他线程", Thread.currentThread().getName());
parentMonitor.await();
if (IS_OK) {
// 事务提交 所有线程都正常执行
log.info("线程正常执行,线程{}提交事务", Thread.currentThread().getName());
transactionalUtil.commit(tranction);
} else {
// 事务回滚
log.info("线程异常执行,线程{}回滚事务", Thread.currentThread().getName());
transactionalUtil.rollback(tranction);
}
} catch (Exception e) {
childResponse.add(Boolean.FALSE);
childMonitor.countDown();
log.error("线程{}异常执行,开始事务回滚", Thread.currentThread().getName());
transactionalUtil.rollback(tranction);
}
});
}
try {
childMonitor.await();
for (Boolean response : childResponse) {
if (!response) {
// 有一个线程执行失败,主线程通知所有子线程回滚事务
log.info("有一个线程执行失败,主线程通知所有子线程回滚事务");
IS_OK = false;
break;
}
}
// 主线程获取结果成功,通知所有子线程提交事务
parentMonitor.countDown();
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown();
}

一些代码说明:

CountDownLatch 维护了一个倒计数器(计数值),初始时通过构造函数设置。每当一个线程完成任务时,会调用 countDown() 方法,倒计数器减一。当计数器变为零时,所有等待在 await() 上的线程将被唤醒,继续执行。

常用方法:

  • await():阻塞当前线程,直到计数器的值为 0。
  • countDown():将计数器减一,表示线程的任务完成。
  • getCount():返回当前计数器的值。

问题:

  • 仍然存在二阶段提交可能导致的数据不一致性问题,例如在主线程通知子线程回滚事务之前,部分子线程可能已经提交了事务。
  • 对于大规模数据和高并发场景,可能需要进一步优化性能,例如调整线程池的大小、优化数据库操作等。
  • 事务可能太长,影响db链接释放

附录-事务管理工具类

@Component
public class NoAutoTransactionalUtil {
@Autowired
private DataSourceTransactionManager dataSourceTransactionManager;

/**
* 开启事务
*
* @return
*/
public TransactionStatus begin() {
TransactionStatus transaction = dataSourceTransactionManager.getTransaction(new DefaultTransactionAttribute());
return transaction;
}

/**
* 提交事务
*
* @return
*/
public void commit(TransactionStatus transactionStatus) {
dataSourceTransactionManager.commit(transactionStatus);
}

/**
* 回滚事务
*
* @return
*/
public void rollback(TransactionStatus transactionStatus) {
dataSourceTransactionManager.rollback(transactionStatus);
}
}

参考资料

mysql批量插入数据量过大报错两种解决办法