【spring】 spring 事务的应用与实现

说明如何用 spring 进行事务管理和 spring 如何实现事务

spring 事务中的概念

  • 事务定义 TransactionDefinition
    定义 spring 事务的属性

    • 隔离等级(Isolation Level),控制本事务与其他事务的隔离级别
      • ISOLATION_DEFAULT
      • ISOLATION_READ_UNCOMMITTED
      • ISOLATION_READ_COMMITTED
      • ISOLATION_REPEATABLE_READ
      • ISOLATION_SERIALIZABLE
    • 传播行为(Propagation Behavior),表示由一个事务控制的方法调用另一个事务控制的方法时的行为
      • PROPAGATION_REQUIRED,如果当前有事务,就加入当前事务;如果没有,则打开一个新事务
      • PROPAGATION_SUPPORTS,如果当前有事务,就加入当前事务;如果没有,就以非事务方式执行
      • PROPAGATION_MANDATORY,如果当前有事务,就加入当前事务;如果没有就抛出异常
      • PROPAGATION_REQUIRES_NEW,如果当前有事务,就挂起当前事务,然后开启新事务
      • PROPAGATION_NOT_SUPPORTED,总是以非事务方式执行,如果当前有事务,就挂起
      • PROPAGATION_NEVER,总是以非事务方式执行,如果当前有事务,就抛出异常
      • PROPAGATION_NESTED,如果当前有事务,就在嵌套事务内执行;如果没有就打开一个新事务
    • 超时(Timeout),单位是秒
    • 是否只读(Read Only)
  • 事务状态 TransactionStatus
    表示一个事务的状态

  • 事务管理器 PlatformTransactionManager
    用来操作事务和管理事务,有以下操作

    • TransactionStatus getTransaction(TransactionDefinition definition) 返回当前事务或开启新事务
    • void commit(TransactionStatus status) 提交事务
    • void rollback(TransactionStatus status) 回滚事务

spring 事务的使用方式

编程式事务

使用 TransactionTemplate 或者直接使用底层的 PlatformTransactionManager (不推荐), 这种方式比较少使用

1
2
3
4
5
6
7
8
TransactionTemplate t = new TransactionTemplate();
t.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); // 设置事务属性
t.execute(new TransactionCallback<Integer>() {
@Override
public Integer doInTransaction(TransactionStatus status) {
// do something
}
});

基于注解的声明式事务

用 @Transactional 注解标记类或方法,然后 spring 会在事务中执行方法

1
2
3
4
@Transactional(rollbackFor = Throwable.class)
public void foo() {
// do something
}

@Transactional 有如下属性:

  • value: transactionManager 的别名
  • transactionManager: 指定的事务管理器
  • propagation:事务传播行为
  • isolation:事务隔离级别
  • timeout:超时值
  • readOnly:是否只读
  • rollbackFor:表示哪些异常抛出时会回滚事务
  • rollbackForClassName:同上
  • noRollbackFor:表示哪些异常抛出时不会回滚事务
  • noRollbackForClassName:同上

基于 XML 配置的声明式事务

XML 方式是基于 tx 和 aop 名字空间的标签定义一个事务增强器

1
2
3
4
5
6
7
8
9
10
<tx:advice id="advice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="get*" propagation="REQUIRED" read-only="false" rollback-for="java.lang.Exception"/>
</tx:attributes>
</tx:advice>

<aop:config>
<aop:pointcut id="testService" expression="execution (* bar.foo.Service.*(..))"/>
<aop:advisor advice-ref="advice" pointcut-ref="testService"/>
</aop:config>

基于 TransactionProxyFactoryBean 的声明式事务

定义一个 TransactionProxyFactoryBean 单独为某些 bean 增加事务增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

public class MyBean {
public Integer getRecordWithTransaction() {
...
}

public Integer getRecordWithOutTransaction() {
...
}
}

@Bean
public MyBean myBean() {
return new MyBean();
}

@Bean
public TransactionProxyFactoryBean proxyFactory() {
Properties properties = new Properties();
properties.put("getRecordWithTransaction", "PROPAGATION_REQUIRED");

TransactionProxyFactoryBean f = new TransactionProxyFactoryBean();
f.setTarget(myBean());
f.setTransactionManager(txManager());
f.setTransactionAttributes(properties);
return f;
}

spring 事务的实现

事务同步

TransactionSynchronizationManager,实现事务的关键类,用于实现资源管理。资源管理,是对那些应该一个资源只在一个线程中使用的(如 JDBC Connections 和 Hibernate Sessions)资源进行管理。这类资源通常不会假设对线程进行绑定,所以进行事务管理时就需要这个操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

// 成员都是静态的,数据都是 ThreadLocal,所以用 abstract 修饰
public abstract class TransactionSynchronizationManager {

// NamedThreadLocal 是 spring 对 ThreadLocal 的扩展,增加名字
// resources 是事务的资源
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");

// 可以通过 registerSynchronization 方法注册 TransactionSynchronization
// 在事务执行的过程中,会调用所有 TransactionSynchronization 的相关方法
private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");

private static final ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");

private static final ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");

private static final ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");

private static final ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");

// 初始化事务同步,这是事务开始时要调用的方法
public static void initSynchronization() throws IllegalStateException {
// 检查 synchronizations 是否为空
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
logger.trace("Initializing transaction synchronization");
synchronizations.set(new LinkedHashSet<>());
}

// 清除同步,这是事务结束时要调用的方法
public static void clearSynchronization() throws IllegalStateException {
// 检查 synchronizations 是否为空
if (!isSynchronizationActive()) {
throw new IllegalStateException("Cannot deactivate transaction synchronization - not active");
}
logger.trace("Clearing transaction synchronization");
synchronizations.remove();
}

public static void bindResource(Object key, Object value) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Assert.notNull(value, "Value must not be null");

// 获取或创建 resources
Map<Object, Object> map = resources.get();
if (map == null) {
map = new HashMap<>();
resources.set(map);
}

// 将 key, value 放到 resources
Object oldValue = map.put(actualKey, value);
if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
oldValue = null;
}

if (oldValue != null) {
throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
}

public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
// doGetResource 会从 resources 获得数据
Object value = doGetResource(actualKey);
return value;
}

public static Object unbindResource(Object key) throws IllegalStateException {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
// doUnbindResource 从 resources 删除 key, value
Object value = doUnbindResource(actualKey);
if (value == null) {
throw new IllegalStateException(
"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
}
return value;
}
}

事务的实现

spring 事务支持对不同的资源进行管理,它由 AbstractPlatformTransactionManager 实现,不同的资源需要继承 AbstractPlatformTransactionManager 然后覆写一些关键方法,从而实现对不同资源的事务管理。例如 jdbc 事务的实现是 DataSourceTransactionManager。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

// 开启或加入一个事务
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 创建事务对象,由具体的事务管理器实现
Object transaction = doGetTransaction();

// 如果未传入 definition 就使用默认配置
if (definition == null) {
definition = new DefaultTransactionDefinition();
}

// 如果当前有事务,就调用 handleExistingTransaction, 根据传播行为的配置执行事务
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(definition, transaction, debugEnabled);
}

...

// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

// 暂停当前事务,SuspendedResourcesHolder 用于保存 TransactionSynchronizationManager 当前的成员值
SuspendedResourcesHolder suspendedResources = suspend(null);

try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

// 开始事务,doBegin 由子类实现,按照具体资源的语义开启事务
doBegin(transaction, definition);

/* 设置事务同步状态
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
TransactionSynchronizationManager.initSynchronization();
*/
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
// 从 suspendedResources 恢复成员的值到 TransactionSynchronizationManager
resume(null, suspendedResources);
throw ex;
}
}
else {
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}

// 提交事务
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
if (defStatus.isLocalRollbackOnly()) {
processRollback(defStatus, false);
return;
}

if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
processRollback(defStatus, true);
return;
}

processCommit(defStatus);
}

// 回滚事务
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}

// 处理事务提交
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;

try {
boolean unexpectedRollback = false;
// 由子类实现
prepareForCommit(status);
// 调用 TransactionSynchronization 的 beforeCommit 方法
triggerBeforeCommit(status);
// 调用 TransactionSynchronization 的 beforeCompletion 方法
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;

if (status.hasSavepoint()) {
// save point 不展开分析
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
unexpectedRollback = status.isGlobalRollbackOnly();
// 提交事务,由子类实现
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}

if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// 调用 TransactionSynchronizationManager.clearSynchronization() 以及 TransactionSynchronization 的 afterCompletion
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// doCommit 会抛出这个异常,检查是否在提交异常时进行回滚
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
// 调用 TransactionSynchronization 的 beforeCompletion
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}

try {
// 调用 TransactionSynchronization 的 afterCommit
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}

// 处理事务回滚
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;

try {
triggerBeforeCompletion(status);

if (status.hasSavepoint()) {
// save point 不展开分析
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
// 执行回滚,由子类实现
doRollback(status);
}
else {
// 说明在一个大的事务内
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
doSetRollbackOnly(status);
}
}
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}

triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}

// 结束事务,调用相关的清除方法
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
status.setCompleted();

// 清除 TransactionSynchronizationManager
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}

// 调用 doCleanupAfterCompletion, doCleanupAfterCompletion 由子类实现
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}

if (status.getSuspendedResources() != null) {
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
}

jdbc 事务

要实现 jdbc 事务,需要保证在同一个线程中获取的都是同一个 Connection。 使用 DataSourceUtils.getConnection 方法可以获取 Connection。 DataSourceUtils.getConnection 会尝试用 TransactionSynchronizationManager.getResource 来事务里的 Connection。如果在一个事务里,TransactionSynchronizationManager.getResource 将返回之前的 ConnectionHolder,可以从中获取 Connection;如果不在一个事务里,TransactionSynchronizationManager.getResource 返回 null,这时 DataSourceUtils.getConnection 就从 DataSource 获取 Connection。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

public abstract class DataSourceUtils {

...

public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
try {
return doGetConnection(dataSource);
}
catch (SQLException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection", ex);
}
catch (IllegalStateException ex) {
throw new CannotGetJdbcConnectionException("Failed to obtain JDBC Connection: " + ex.getMessage());
}
}

public static Connection doGetConnection(DataSource dataSource) throws SQLException {

// 尝试从 TransactionSynchronizationManager 获取 Connection
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}

// 从 DataSource 获取 Connection
Connection con = fetchConnection(dataSource);

// 如果 TransactionSynchronizationManager 是激活状态,就将 Connection 注册到 TransactionSynchronizationManager 以便下次使用
if (TransactionSynchronizationManager.isSynchronizationActive()) {
try {
// 构造 ConnectionHolder
ConnectionHolder holderToUse = conHolder;
if (holderToUse == null) {
holderToUse = new ConnectionHolder(con);
}
else {
holderToUse.setConnection(con);
}
holderToUse.requested();

// 注册到 TransactionSynchronizationManager 并绑定 DataSource 和 ConnectionHolder
TransactionSynchronizationManager.registerSynchronization(new ConnectionSynchronization(holderToUse, dataSource));
holderToUse.setSynchronizedWithTransaction(true);
if (holderToUse != conHolder) {
TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
}
}
catch (RuntimeException ex) {
releaseConnection(con, dataSource);
throw ex;
}
}

return con;
}

...
}

事务需要事务管理器来管理,JDBC 事务由 DataSourceTransactionManager 类实现,它是 AbstractPlatformTransactionManager 的子类,主要是实现 AbstractPlatformTransactionManager 里定义的抽象方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {

// 获取事务对象
@Override
protected Object doGetTransaction() {
// 以 DataSourceTransactionObject 作为事务对象
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// obtainDataSource 方法获取一个 JDBC 的 DataSource 对象
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}

// 开始事务
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
// 从 DataSource 获取新的 Connection 并设置到事务对象
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 得到 Connection 对象
con = txObject.getConnectionHolder().getConnection();

// prepareConnectionForTransaction 会设置是否只读和事务隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);

// 设置为手动提交
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
con.setAutoCommit(false);
}

// 根据 definition 决定是否执行 SET TRANSACTION READ ONLY
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);

// 设置事务超时时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// 绑定 DataSource 和 ConnectionHolder 到 TransactionSynchronizationManager
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}

// 获取 Connection 然后提交
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}

// 获取 Connection 然后回滚
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
try {
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}

// 清理资源
@Override
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;

// 资源解除绑定
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

// 重置 Connection, 因为 Connection 可能来自连接池
Connection con = txObject.getConnectionHolder().getConnection();
try {
// 重置自动提交
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
// 重置事务隔离级别
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}

// 释放、关闭 Connection
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
}

txObject.getConnectionHolder().clear();
}
}

声明式事务的实现

不论是用注解还是用 XML 方式,都是使用 AOP 实现事务,通过 TransactionInterceptor 来进行事务功能的增强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {

@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// invokeWithinTransaction 是父类 TransactionAspectSupport 的方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// 如果 tas 是 null, 说明方法需要用非事务的方式执行
TransactionAttributeSource tas = getTransactionAttributeSource();

// 事务的属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

// 获取 transaction manager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);

// 得到 join point
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

// 这是一般情况
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务,调用 transaction manager 的 getTransaction
// TransactionInfo 包含一个 TransactionStatus 成员
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

// 调用原方法
Object retVal;
try {
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 回滚,调用 transaction manager 的 rollback
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 恢复线程事务状态
cleanupTransactionInfo(txInfo);
}

// 提交,调用 transaction manager 的 commit
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
// transaction manager 是 CallbackPreferringPlatformTransactionManager 类型的情况
final ThrowableHolder throwableHolder = new ThrowableHolder();

try {
Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
try {
return invocation.proceedWithInvocation();
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});

if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
ex2.initApplicationException(throwableHolder.throwable);
}
throw ex2;
}
catch (Throwable ex2) {
throw ex2;
}
}
}
}