Spring 事务分析 前置知识 在阅读此文章前, 了解一下基础知识有助于阅读
事务的概念
事务的隔离级别
JDBC基础
Spring bean 的生命周期
创建代理对象 有了解过spring bean生命周期知道, 代理对象的创建一般在实在初始化后, 在BeanPostProcessor类中的applyBeanPostProcessorsAfterInitialization方法实现的
在AbstractAutoProxyCreator中的applyBeanPostProcessorsAfterInitialization中, 会调用wrapIfNecessary, 在wrapIfNecessary方法中, spring通过getAdvicesAndAdvisorsForBean方法获取了作用于当前bean的specificInterceptors(例如: BeanFactoryTransactionAttributeSourceAdvisor) ,然后将该specificInterceptors放入到proxyFactory中, 通过proxyFactory去创建代理对象
方法调用 spring如何通过ReflectiveMethodInvocation调用其他的Interceptor, 可以查看
Spring aop 源码分析(2)
TransactionInterceptor spring通过代理最先调用了TransactionInterceptor这个关键Interceptor
1 2 3 4 5 6 7 8 9 10 public Object invoke (MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null ); return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
invokeWithinTransaction 进入到这个方法中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 protected Object invokeWithinTransaction (Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { TransactionAttributeSource tas = getTransactionAttributeSource(); final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null ); final TransactionManager tm = determineTransactionManager(txAttr); if (this .reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) { ReactiveTransactionSupport txSupport = this .transactionSupportCache.computeIfAbsent(method, key -> { if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) { ... } ReactiveAdapter adapter = this .reactiveAdapterRegistry.getAdapter(method.getReturnType()); if (adapter == null ) { ... } return new ReactiveTransactionSupport (adapter); }); return txSupport.invokeWithinTransaction( method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm); } PlatformTransactionManager ptm = asPlatformTransactionManager(tm); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) { TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification); Object retVal; try { retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) { TransactionStatus status = txInfo.getTransactionStatus(); if (status != null && txAttr != null ) { retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } } commitTransactionAfterReturning(txInfo); return retVal; } else { Object result; final ThrowableHolder throwableHolder = new ThrowableHolder (); try { result = ((CallbackPreferringPlatformTransactionManager) ptm). execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr,joinpointIdentification, status); try { Object retVal = invocation.proceedWithInvocation(); if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)){ retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status); } return retVal; } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException (ex); } } else { throwableHolder.throwable = ex; return null ; } } finally { cleanupTransactionInfo(txInfo); } }); } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null ) { ... } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null ) { ... } throw ex2; } if (throwableHolder.throwable != null ) { throw throwableHolder.throwable; } return result; } }
createTransactionIfNecessary 该方法是创建事务的开始
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 protected TransactionInfo createTransactionIfNecessary (@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { if (txAttr != null && txAttr.getName() == null ) { txAttr = new DelegatingTransactionAttribute (txAttr) { @Override public String getName () { return joinpointIdentification; } }; } TransactionStatus status = null ; if (txAttr != null ) { if (tm != null ) { status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { ... } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
getTransaction 进入到getTransaction方法中, 该方法会处理已经有的事务或者开启新的事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Override public final TransactionStatus getTransaction (@Nullable TransactionDefinition definition) throws TransactionException { TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults()); Object transaction = doGetTransaction(); boolean debugEnabled = logger.isDebugEnabled(); if (isExistingTransaction(transaction)) { return handleExistingTransaction(def, transaction, debugEnabled); } if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException ... } if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException ... } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || def.getPropagationBehavior() == TransactionDefinition. PROPAGATION_REQUIRES_NEW ||def.getPropagationBehavior() == TransactionDefinition. PROPAGATION_NESTED) { SuspendedResourcesHolder suspendedResources = suspend(null ); if (debugEnabled) { ... } try { return startTransaction(def, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error ex) { resume(null , suspendedResources); throw ex; } } else { if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { ... } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(def, null , true , newSynchronization, debugEnabled, null ); } }
handleExistingTransaction 处理已经存在的事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 private TransactionStatus handleExistingTransaction ( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException .. } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction" ); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null , false , newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { ... } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { return startTransaction(definition, transaction, debugEnabled, suspendedResources); } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException ... } if (debugEnabled) { ... } if (useSavepointForNestedTransaction()) { DefaultTransactionStatus status = prepareTransactionStatus( definition, transaction, false , false , debugEnabled, null ); status.createAndHoldSavepoint(); return status; } else { return startTransaction(definition, transaction, debugEnabled, null ); } } if (debugEnabled) { ... } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager. getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException ... } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException ... } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false , newSynchronization, debugEnabled, null ); }
startTransaction 该方法主要是创建新的事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private TransactionStatus startTransaction (TransactionDefinition definition, Object transaction,boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true , newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; }
dobegin 开启事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Override protected void doBegin (Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null ; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { ... } txObject.setConnectionHolder(new ConnectionHolder (newCon), true ); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true ); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); txObject.setReadOnly(definition.isReadOnly()); if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true ); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit" ); } con.setAutoCommit(false ); } prepareTransactionalConnection(con, definition); txObject.getConnectionHolder().setTransactionActive(true ); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, obtainDataSource()); txObject.setConnectionHolder(null , false ); } throw new CannotCreateTransactionException ... } }
completeTransactionAfterThrowing 当调用方法的过程中抛出异常, 可能需要回滚, 也可能提交事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 protected void completeTransactionAfterThrowing (@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null ) { if (logger.isTraceEnabled()) { ... } if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { ... } catch (RuntimeException | Error ex2) { ... throw ex2; } } else { try { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } catch (TransactionSystemException ex2) { ... throw ex2; } catch (RuntimeException | Error ex2) { ... throw ex2; } } } }
rollback 事务进行回滚
1 2 3 4 5 6 7 8 9 @Override public final void rollback (TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException ... } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; processRollback(defStatus, false ); }
processRollback方法
选择savepoint回滚或者直接回滚
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 private void processRollback (DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { triggerBeforeCompletion(status); if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Rolling back transaction to savepoint" ); } status.rollbackToHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction rollback" ); } doRollback(status); } else { if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { ... } doSetRollbackOnly(status); } else { if (status.isDebug()) { ... } } } else { ... } if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false ; } } } catch (RuntimeException | Error ex) { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); throw ex; } triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); if (unexpectedRollback) { throw new UnexpectedRollbackException ... } } finally { cleanupAfterCompletion(status); } }
commitTransactionAfterReturning 方法完成之后需要提交事务
1 2 3 4 5 6 7 8 9 protected void commitTransactionAfterReturning (@Nullable TransactionInfo txInfo) { if (txInfo != null && txInfo.getTransactionStatus() != null ) { if (logger.isTraceEnabled()) { ... } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
commit 提交事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public final void commit (TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException ... } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback" ); } processRollback(defStatus, false ); return ; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { ... } processRollback(defStatus, true ); return ; } processCommit(defStatus); }
processCommit 处理提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 private void processCommit (DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false ; try { boolean unexpectedRollback = false ; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true ; if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint" ); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit" ); } unexpectedRollback = status.isGlobalRollbackOnly(); doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } if (unexpectedRollback) { throw new UnexpectedRollbackException ... } } catch (UnexpectedRollbackException ex) { ... } catch (TransactionException ex) { ... } catch (RuntimeException | Error ex) { ... } try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } }
到此大致的整个事务过程就算完成了, 但是spring事务还有很多细节需要我们去揣摩
声明 如有错误请联系作者更正