多线程事务控制与大量数据导入
大量数据导入问题
问题复现
我这里有一段代码,要将大量数据批量插入数据库
@Transactional(rollbackFor = Exception.class)
@Override
public void testAdd() {
List<Person> list = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
Person person = new Person();
person.setName("name" + i);
person.setPassword("password" + i);
person.setEmail("email" + i);
list.add(person);
}
baseMapper.save(list);
}
会报错:Error updating database. Cause: com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large (83,666,757 > 67,108,864). You can change this value on the server by setting the 'max_allowed_packet' variable.
问题原因
这个错误信息是MySQL抛出的PacketTooBigException
,表示要执行的SQL语句(或数据包)太大,超过了服务器配置的max_allowed_packet
限制。默认大小4194304 也就是4M
解决办法
修改配置
办法一:
- 修改 my.ini 加上
max_allowed_packet =67108864
,也就是64MB,不过这里可以设置的更大一些,以实际情况为准,修改完成之后要重启mysql服务,如果通过命令行修改就不用重启mysql服务。
办法二:
- 通过命令行:设置为500M,会直接生效,
mysql> set global max_allowed_packet = 500*1024*1024;
分批插入
我们将总数据进行计算,分批次差人,例如每次插入100条,然后计算商和余数,再进行分配插入,最后把余数插入。
@Transactional(rollbackFor = Exception.class)
@Override
public void testAdd() {
List<Person> list = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
Person person = new Person();
person.setName("name" + i);
person.setPassword("password" + i);
person.setEmail("email" + i);
list.add(person);
}
//10000 100 100
int c = 100;
int b = list.size() / c;
int d = list.size() % c;
for (int e = c; e <= c * b; e = e + c) {
baseMapper.save(list.subList(e - c, e));
}
if (d != 0) {
baseMapper.save(list.subList(c * b, list.size()));
}
}
这样也不是特别好,速度慢容易超时。
多线程分批导入
多线程不可以缓解数据库压力!这里只是一个思路,为了快一点,并且这里还存在事务失效问题。
这里给一个思路,但是这种写法会有很大的问题,会在下面介绍。
@Transactional(rollbackFor = Exception.class)
@Override
public void testAdd() {
List<Person> list = new ArrayList<>();
for (int i = 0; i < 1000000; i++) {
Person person = new Person();
person.setName("name" + i);
person.setPassword("password" + i);
person.setEmail("email" + i);
list.add(person);
}
int threadNum = 20;
int size = list.size();
ExecutorService executorService = Executors.newFixedThreadPool(20);
for (int i = 0; i < 20; i++) {
final List<Person> subList = list.subList(size / threadNum * i, size / threadNum * (i + 1));
executorService.execute(() -> {
try {
baseMapper.save(subList);
} catch (Exception e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}