Spring无法对多线程进行事务控制,原因是:
多线程底层连接数据库的时候,是使用的线程变量(TheadLocal),所以,开多少线程理论上就会建立多少个连接,每个线程有自己的连接,事务肯定不是同一个了。
解决办法:我强制手动把每个线程的事务状态放到一个同步集合里面。然后如果有单个异常,循环回滚每个线程。
假如service中的一个方法由以下逻辑构成:
1.前面的是调用多线程前的操作
2.调用多线程的操作
假设其中任何一个与数据库的更新操作发生了异常,想要整体回滚怎么办?那么就要用到以下的方式了:
List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
// 在每组逻辑操作之前加入以下代码
// 使用这种方式将事务状态都放在同一个事务里面
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
详细DEMO:
TestServiceImpl:
package com.test.impl;
import com.test.entity.User2;
import com.test.entity.User3;
import com.test.mapper.User2Mapper;
import com.test.mapper.User3Mapper;
import com.test.service.TestBService;
import com.test.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Created by liuyachao on 2018/9/3.
*/
@Slf4j
@Service
public class TestServiceImpl implements TestService {
@Autowired
private User2Mapper user2Mapper;
@Autowired
private User3Mapper user3Mapper;
@Autowired
private TestBService testBService;
@Autowired
private PlatformTransactionManager transactionManager;
List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());
int count = 112;
static int countTest = 0;
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public int saveUser2(User2 user2) {
Integer result = 0;
try{
result = user2Mapper.insertSelective(user2);
//int i = 1/0;
if(user2.getId() == 114){
int i = 1/0;
}
}catch (Exception e){
log.error("插入异常",e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return result;
}
return result;
}
@Override
public User3 getUser3List(User3 user3) {
User3 result =user3Mapper.selectByPrimaryKey(user3.getId());
return result;
}
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public void threadMethod(){
User2 user1 = new User2();
user1.setId(111);
user1.setPassword("1");
user1.setUsername("1");
try{
// 使用这种方式将事务状态都放在同一个事务里面
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
transactionStatuses.add(status);
testBService.saveUser2(user1);
}catch (Exception e){
e.printStackTrace();
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
System.out.println("main insert is over");
try{
for(int a=0 ;a<3;a++){
ThreadOperation threadOperation= new ThreadOperation();
Thread innerThread = new Thread(threadOperation);
/*innerThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e){
*//*throw new RuntimeException();
log.error("###内部线程发生异常");
e.printStackTrace();*//*
// 这边回滚不好使,需要用逻辑删除处理增加的数据
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
});*/
innerThread.start();
}
}catch (Exception e){
log.error("###线程异常");
e.printStackTrace();
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
}
public class ThreadOperation implements Runnable {
@Override
public void run() {
try{
// 使用这种方式将事务状态都放在同一个事务里面
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。
TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态
transactionStatuses.add(status);
User2 user2 = new User2();
user2.setId(count++);
user2.setPassword("10");
user2.setUsername("10");
/**
* 1.这里如果用其他类的saveUser2方法,在这个线程内事务生效,其他线程不受影响
* 2.如果是用本类的方法,这个线程内的事务不生效,其他线程也不受影响
*/
testBService.saveUser2(user2); // testBService.
System.out.println("thread insert is over");
}catch (Exception e){
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
//throw new RuntimeException();
// 事务回滚不管用
/*TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
throw new RuntimeException();*/
/*for (TransactionStatus transactionStatus:transactionStatuses) {
transactionStatus.setRollbackOnly();
}*/
}
}
}
/**
* 多线程争夺全局资源
* @param args
*/
public static void main(String[] args){
for(int a=0 ;a<100;a++){
ThreadOperation2 threadOperation2 = new ThreadOperation2();
Thread innerThread = new Thread(threadOperation2);
innerThread.start();
}
System.out.println(countTest);
}
public static class ThreadOperation2 implements Runnable {
@Override
public void run() {
countTest++;
}
}
}
TestService:
package com.test.service;
import com.test.entity.User2;
import com.test.entity.User3;
/**
* Created by liuyachao on 2018/9/3.
*/
public interface TestService {
int saveUser2(User2 user2);
User3 getUser3List(User3 user3);
void threadMethod();
}
TestBService:
package com.test.service;
import com.test.entity.User2;
import com.test.entity.User3;
/**
* Created by liuyachao on 2018/9/3.
*/
public interface TestBService {
int saveUser2 (User2 user2);
User3 getUser3List(User3 user3);
}
TestBServiceImpl:
package com.test.impl;
import com.test.entity.User2;
import com.test.entity.User3;
import com.test.mapper.User2Mapper;
import com.test.mapper.User3Mapper;
import com.test.service.TestBService;
import com.test.service.TestService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
/**
* Created by liuyachao on 2018/9/3.
*/
@Slf4j
@Service
public class TestBServiceImpl implements TestBService {
@Autowired
private User2Mapper user2Mapper;
@Autowired
private User3Mapper user3Mapper;
int count = 112;
static int countTest = 0;
@Override
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})
public int saveUser2(User2 user2){
Integer result = 0;
/*try{*/
result = user2Mapper.insertSelective(user2);
if(user2.getId() == 114){
int i = 1/0;
}
/*}catch (Exception e){
log.error("插入异常",e);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
return result;
}*/
return result;
}
@Override
public User3 getUser3List(User3 user3) {
User3 result =user3Mapper.selectByPrimaryKey(user3.getId());
return result;
}
}
User2:
package com.test.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class User2 implements Serializable{
private static final long serialVersionUID = 9085886691811169694L;
private Integer id;
private String username;
private String password;
}
具体的mapper等方法自己可以做一个属于自己的demo来验证事务是否整体回滚:
此demo操作均为新增数据的操作,调用多线程前、调用多线程均为新增数据。
在多线程中的testBService.saveUser2(user2); 中saveUser2方法中模拟一个异常如:int i = 1/0;,来验证当其中一个线程满足条件下发生异常的时候,事务整体回滚,数据库中并没有新增数据
作者:Jeebiz 创建时间:2020-06-17 12:55
最后编辑:Jeebiz 更新时间:2024-10-26 16:27
最后编辑:Jeebiz 更新时间:2024-10-26 16:27