Spring 事务分析

前置知识

在阅读此文章前, 了解一下基础知识有助于阅读

  • 事务的概念

  • 事务的隔离级别

  • JDBC基础

  • Spring bean 的生命周期

创建代理对象

有了解过spring bean生命周期知道, 代理对象的创建一般在实在初始化后, 在BeanPostProcessor类中的applyBeanPostProcessorsAfterInitialization方法实现的

AbstractAutoProxyCreator中的applyBeanPostProcessorsAfterInitialization中, 会调用wrapIfNecessary, 在wrapIfNecessary方法中, spring通过getAdvicesAndAdvisorsForBean方法获取了作用于当前beanspecificInterceptors(例如: BeanFactoryTransactionAttributeSourceAdvisor) ,然后将该specificInterceptors放入到proxyFactory中, 通过proxyFactory去创建代理对象

方法调用

spring如何通过ReflectiveMethodInvocation调用其他的Interceptor, 可以查看

Spring aop 源码分析(2)

TransactionInterceptor

spring通过代理最先调用了TransactionInterceptor这个关键Interceptor

1
2
3
4
5
6
7
8
9
10
public Object invoke(MethodInvocation invocation) throws Throwable {

Class<?> targetClass = (invocation.getThis() != null ?
AopUtils.getTargetClass(invocation.getThis()) : null);

// 通过该方法开启整个事务流程
return invokeWithinTransaction(invocation.getMethod(),
targetClass,
invocation::proceed);
}

invokeWithinTransaction

进入到这个方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// 获取 TransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取属性, 包括事务的一些定义属性(超时时间, 传播级别等)
final TransactionAttribute txAttr = (tas != null ?
tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器, 事务管理器是对整个事务的管理器
final TransactionManager tm = determineTransactionManager(txAttr);

// Reactive Transaction
if (this.reactiveAdapterRegistry != null
&& tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport =
this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) &&
KotlinDelegate.isSuspend(method)) {
...
}
ReactiveAdapter adapter =
this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
...
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}

PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification =
methodIdentification(method, targetClass, txAttr);

// 如果属性为空或者不是CallbackPreferringPlatformTransactionManager
if (txAttr == null
|| !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {

// 创建事务
TransactionInfo txInfo =
createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

Object retVal;
try {
// 调用我们的业务方法获取返回值
// 通常是一个chain
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 遇到异常时候的处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清除事务信息(清除线程的绑定)
cleanupTransactionInfo(txInfo);
}

if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}

commitTransactionAfterReturning(txInfo);
return retVal;
}
// 代理有callback的
else {
Object result;
final ThrowableHolder throwableHolder = new ThrowableHolder();


try {
result = ((CallbackPreferringPlatformTransactionManager) ptm).
execute(txAttr, status -> {
TransactionInfo txInfo =
prepareTransactionInfo(ptm, txAttr,joinpointIdentification, status);
try {
Object retVal = invocation.proceedWithInvocation();
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)){

retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
return retVal;
}
catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
// A RuntimeException: will lead to a rollback.
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
}
else {
throw new ThrowableHolderException(ex);
}
}
else {
// A normal return value: will lead to a commit.
throwableHolder.throwable = ex;
return null;
}
}
finally {
cleanupTransactionInfo(txInfo);
}
});
}
catch (ThrowableHolderException ex) {
throw ex.getCause();
}
catch (TransactionSystemException ex2) {
if (throwableHolder.throwable != null) {
...
}
throw ex2;
}
catch (Throwable ex2) {
if (throwableHolder.throwable != null) {
...
}
throw ex2;
}

// Check result state: It might indicate a Throwable to rethrow.
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
}
}

createTransactionIfNecessary

该方法是创建事务的开始

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}

TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 获取事务
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
...
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

getTransaction

进入到getTransaction方法中, 该方法会处理已经有的事务或者开启新的事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {

// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果已经存在事务了
if (isExistingTransaction(transaction)) {
// 处理已经存在的事务
return handleExistingTransaction(def, transaction, debugEnabled);
}

// 检查合理的超时时间
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException...
}
// MANDATORY规则: 如果有事务就加入事务, 如果没有事务就抛出异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException...
}
// 如果事务的传播级别是REQUIRED
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED
|| def.getPropagationBehavior() == TransactionDefinition.
PROPAGATION_REQUIRES_NEW
||def.getPropagationBehavior() == TransactionDefinition.
PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
...
}
try {
// 创建新的事务
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {

if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
...
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 准备事务的状态
return prepareTransactionStatus(def, null, true, newSynchronization,
debugEnabled, null);
}
}

handleExistingTransaction

处理已经存在的事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 传播级别为nerver
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException..
}
// 传播级别为 NOT_SUPPORTED
if (definition.getPropagationBehavior() ==
TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 传播级别为REQUIRES_NEW
if (definition.getPropagationBehavior() ==
TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
...
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled,
suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 传播级别为NESTED
if (definition.getPropagationBehavior() ==
TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException...
}
if (debugEnabled) {
...
}
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status =
prepareTransactionStatus(
definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
return startTransaction(definition, transaction, debugEnabled, null);
}
}


if (debugEnabled) {
...
}
// 如果是 SUPPORTS或者REQUIRED
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.
getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel !=
definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException...
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException...
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

startTransaction

该方法主要是创建新的事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

boolean newSynchronization = (getTransactionSynchronization() !=
SYNCHRONIZATION_NEVER);
// 新事务的默认状态
DefaultTransactionStatus status =
newTransactionStatus(definition, transaction, true, newSynchronization,
debugEnabled, suspendedResources);
// 开始事务
doBegin(transaction, definition);

prepareSynchronization(status, definition);
return status;
}
dobegin

开启事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;

try {
// 如果没有ConnectionHolder
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
...
}
// 创建新的
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}

txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();

Integer previousIsolationLevel =
DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 设置是否只读
txObject.setReadOnly(definition.isReadOnly());


if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 关键点, 设置事务不自动提交
con.setAutoCommit(false);
}

prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);

int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}

// 通过ThreadLocal绑定到当前线程中
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(),
txObject.getConnectionHolder());
}
}

catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException...
}
}

completeTransactionAfterThrowing

当调用方法的过程中抛出异常, 可能需要回滚, 也可能提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
...
}
if (txInfo.transactionAttribute != null &&
txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 尝试回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
...
}
catch (RuntimeException | Error ex2) {
...
throw ex2;
}
}
else {

try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
catch (TransactionSystemException ex2) {
...
throw ex2;
}
catch (RuntimeException | Error ex2) {
...
throw ex2;
}
}
}
}

rollback

事务进行回滚

1
2
3
4
5
6
7
8
9
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException...
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus, false);
}

processRollback方法

选择savepoint回滚或者直接回滚

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;

try {
triggerBeforeCompletion(status);
// 如果有savepoint
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// con.rollback(savepoint)
status.rollbackToHeldSavepoint();
}
// 如果是新的事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// con.rollback()
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() ||
isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
...
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
...
}
}
}
else {
...
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status,
TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}

triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException...
}
}
finally {
cleanupAfterCompletion(status);
}
}

commitTransactionAfterReturning

方法完成之后需要提交事务

1
2
3
4
5
6
7
8
9
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
...
}
// 获取TransactionManager进行提交
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

commit

提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public final void commit(TransactionStatus status) throws TransactionException {
// 如果事务已经完成了
if (status.isCompleted()) {
throw new IllegalTransactionStateException...
}

DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果有回滚标识
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
processRollback(defStatus, false);
return;
}
//
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
...
}
processRollback(defStatus, true);
return;
}

processCommit(defStatus);
}

processCommit

处理提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;

try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 如果有savepoint
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 释放savepoint
status.releaseHeldSavepoint();
}
// 如果是新事务, doCommit
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// con.commit()
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}

// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException...
}
}
catch (UnexpectedRollbackException ex) {
...
}
catch (TransactionException ex) {
...
}
catch (RuntimeException | Error ex) {
...
}

// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
// 触发一些callbacks
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}

}
finally {
cleanupAfterCompletion(status);
}
}

到此大致的整个事务过程就算完成了, 但是spring事务还有很多细节需要我们去揣摩

声明

如有错误请联系作者更正