05-TX
mandatory 英 [ˈmændətəri] adj. 强制性的;强制的;法定的;义务的
suspend 英 [səˈspend] vt.悬;挂;吊;暂停;中止
public class T21_1_Tx_xml {
public static void main(String[] args) throws SQLException {
// System.setProperty(DebuggingClassWriter.DEBUG_LOCATION_PROPERTY, System.getProperty("user.dir") + "/aop_tx");
ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext("x21_tx.xml");
System.out.println("------------------- ac.over -------------------");
TxService txService = ac.getBean("txService", TxService.class);
txService.updService();
// txService.exceptionService();
ac.close();
}
}
<context:property-placeholder location="classpath:p4_db_tx.properties"/>
<bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
<property name="username" value="${jdbc.username}"/>
<property name="password" value="${jdbc.password}"/>
<property name="url" value="${jdbc.url}"/>
<property name="driverClassName" value="${jdbc.driverClassName}"/>
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<constructor-arg name="dataSource" ref="dataSource"/>
</bean>
<bean id="txDao" class="com.listao.tx.TxDao">
<property name="jdbcTemplate" ref="jdbcTemplate"/>
</bean>
<bean id="txService" class="com.listao.tx.TxService">
<property name="txDao" ref="txDao"/>
</bean>
<!-- 事务配置 -->
<aop:config>
<!-- 1.1. AspectJExpressionPointcut -->
<aop:pointcut id="txPoint" expression="execution(* com.listao.tx.*.*(..))"/>
<!-- 1. DefaultBeanFactoryPointcutAdvisor -->
<aop:advisor advice-ref="myAdvice" pointcut-ref="txPoint"/>
</aop:config>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!-- 1.2. TransactionInterceptor, 2.1. DataSourceTransactionManager -->
<tx:advice id="myAdvice" transaction-manager="transactionManager">
<!-- 2.2. NameMatchTransactionAttributeSource -->
<tx:attributes>
<!-- REQUIRED, REQUIRES_NEW, MANDATORY, NESTED, NEVER, NOT_SUPPORTED, SUPPORTS -->
<tx:method name="updService" propagation="REQUIRED"/>
<tx:method name="updDao" propagation="REQUIRED"/>
<!--<tx:method name="exceptionService" propagation="REQUIRED"/>-->
<!--<tx:method name="exceptionDao" propagation="REQUIRED"/>-->
<!--<tx:method name="exceptionDao" propagation="NESTED"/>-->
</tx:attributes>
</tx:advice>
<!--<tx:annotation-driven/>-->
1. xml => BD
1. <aop:config>
- AopNamespaceHandler => ConfigBeanDefinitionParser
parsePointcut()
=> AspectJExpressionPointcutparseAdvisor()
=> DefaultBeanFactoryPointcutAdvisor
ConfigBeanDefinitionParser#parse()
2. <tx:advice>
- TxNamespaceHandler => TxAdviceBeanDefinitionParser
getBeanClass()
=> TransactionInterceptor
TxAdviceBeanDefinitionParser#doParse()
class TxAdviceBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {
protected Class<?> getBeanClass(Element element) {
return TransactionInterceptor.class;
}
protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
//...
}
}
2. java_bean
invokeBFPP()
,解析并填充classpath:p4_db_tx.properties
<aop:config>
=>internalAutoProxyCreator => AspectJAwareAdvisorAutoProxyCreator
1. DefaultBFPointcutAdvisor
<!-- 事务配置 internalAutoProxyCreator -->
<aop:config>
<!-- 1.1. AspectJExpressionPointcut -->
<aop:pointcut id="txPoint" expression="execution(* com.listao.tx.*.*(..))"/>
<!-- 1. DefaultBeanFactoryPointcutAdvisor -->
<aop:advisor advice-ref="myAdvice" pointcut-ref="txPoint"/>
</aop:config>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<!-- 1.2. TransactionInterceptor, 2.1. DataSourceTransactionManager -->
<tx:advice id="myAdvice" transaction-manager="transactionManager">
<!-- 2.2. NameMatchTransactionAttributeSource -->
<tx:attributes>
<!-- REQUIRED, REQUIRES_NEW, MANDATORY, NESTED, NEVER, NOT_SUPPORTED, SUPPORTS -->
<tx:method name="updService" propagation="REQUIRED"/>
<tx:method name="updDao" propagation="REQUIRED"/>
<!--<tx:method name="exceptionService" propagation="REQUIRED"/>-->
<!--<tx:method name="exceptionDao" propagation="REQUIRED"/>-->
<!--<tx:method name="exceptionDao" propagation="NESTED"/>-->
</tx:attributes>
</tx:advice>
<aop:advisor>
=>DefaultBeanFactoryPointcutAdvisor
- Advice advice => "myAdvice" =>
TransactionInterceptor
- Pointcut pointcut => "txPointcut" =>
AspectJExpressionPointcut
- Advice advice => "myAdvice" =>
<tx:advice>
=>TransactionInterceptor
- TransactionManager transactionManager =>
DataSourceTransactionManager
- TransactionAttributeSource transactionAttributeSource =>
NameMatchTransactionAttributeSource
=><tx:attributes>
- Map<String, TransactionAttribute> nameMap
- updService => {
RuleBasedTransactionAttribute
@2667} =><tx:method/>
- updDao => {
RuleBasedTransactionAttribute
@2665} =><tx:method/>
- updService => {
- Map<String, TransactionAttribute> nameMap
- TransactionManager transactionManager =>
AbstractAdvisorAutoProxyCreator#findEligibleAdvisors()
1. <aop:advisor>
DefaultBeanFactoryPointcutAdvisor
AbstractAutowireCapableBeanFactory#applyPropertyValues()
2. <tx:method>
TransactionInterceptor
实现了MethodInterceptor
接口,直接将其当做Advice- 在对第一个对象TxDao进行
postProcessAfterInitialization()
,实例化advice
AbstractAutowireCapableBeanFactory#applyPropertyValues()
3. anno
public class T21_2_Tx_Anno {
public static void main(String[] args) {
AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext();
System.out.println("------------------- ac.over -------------------");
// 注册IOC,类中有@Bean即为配置类
ac.register(TxConfig.class);
ac.refresh();
TxDao txDao = ac.getBean(TxDao.class);
txDao.updDao(2);
ac.close();
}
}
TxConfig.class
=> BD => IOC
// @Configuration 类中有@Bean即为配置类
@PropertySource("classpath:p4_db_tx.properties")
@EnableTransactionManagement
public class TxConfig {
@Value("${jdbc.driverClassName}")
private String driverClassName;
@Value("${jdbc.url}")
private String url;
@Value("${jdbc.username}")
private String username;
@Value("${jdbc.password}")
private String password;
@Bean
public DataSource dataSource() {
DruidDataSource dds = new DruidDataSource();
dds.setDriverClassName(driverClassName);
dds.setUrl(url);
dds.setUsername(username);
dds.setPassword(password);
return dds;
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
/*
* @Bean将txDao注入到IOC,@Autowired将jdbcTemplate注入到txDao
*/
@Bean
public TxDao txDao() {
return new TxDao();
}
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
/**
* 默认Proxy,另一个是ASPECTJ
* <p>
* Returns {@link ProxyTransactionManagementConfiguration} or
* {@code AspectJ(Jta)TransactionManagementConfiguration} for {@code PROXY}
* and {@code ASPECTJ} values of {@link EnableTransactionManagement#mode()},
* respectively.
*/
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
// org.springframework.context.annotation.AutoProxyRegistrar
// org.springframework.transaction.annotation.ProxyTransactionManagementConfiguration
return new String[]{AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[]{determineTransactionAspectClass()};
default:
return null;
}
}
private String determineTransactionAspectClass() {
return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
}
}
TransactionManagementConfigurationSelector#selectImports()
class ConfigurationClassParser {
private void processImports(ConfigurationClass configClass, SourceClass currentSourceClass,
Collection<SourceClass> importCandidates, Predicate<String> exclusionFilter,
boolean checkForCircularImports) {
if (importCandidates.isEmpty()) {
return;
}
// 通过一个栈(importStack)解决循环引入
if (checkForCircularImports && isChainedImportOnStack(configClass)) {
this.problemReporter.error(new CircularImportProblem(configClass, this.importStack));
} else {
this.importStack.push(configClass);
try {
for (SourceClass candidate : importCandidates) {
// 1. `implements ImportSelector`
if (candidate.isAssignable(ImportSelector.class)) {
// Candidate class is an ImportSelector -> delegate to it to determine imports
Class<?> candidateClass = candidate.loadClass();
// 反射生成ImportSelect
ImportSelector selector = ParserStrategyUtils.instantiateClass(candidateClass, ImportSelector.class,
this.environment, this.resourceLoader, this.registry);
Predicate<String> selectorFilter = selector.getExclusionFilter();
if (selectorFilter != null) {
exclusionFilter = exclusionFilter.or(selectorFilter);
}
// 延迟处理,所有configClass加载完毕后加载
if (selector instanceof DeferredImportSelector) {
// 将选择器缓存到deferredImportSelectorHandler,所有configClass加载完成后统一处理
this.deferredImportSelectorHandler.handle(configClass, (DeferredImportSelector) selector);
} else {
// 1. 获取@Import的类
String[] importClassNames = selector.selectImports(currentSourceClass.getMetadata());
Collection<SourceClass> importSourceClasses = asSourceClasses(importClassNames, exclusionFilter);
// 递归处理
processImports(configClass, currentSourceClass, importSourceClasses, exclusionFilter, false);
}
}
// 2.1. `implements ImportBeanDefinitionRegistrar`的BD
// 2.2. @EnableAspectJAutoProxy
else if (candidate.isAssignable(ImportBeanDefinitionRegistrar.class)) {
// Candidate class is an ImportBeanDefinitionRegistrar ->
// delegate to it to register additional bean definitions
Class<?> candidateClass = candidate.loadClass();
ImportBeanDefinitionRegistrar registrar =
ParserStrategyUtils.instantiateClass(candidateClass, ImportBeanDefinitionRegistrar.class,
this.environment, this.resourceLoader, this.registry);
// 2.3. 存入configClass.importBeanDefinitionRegistrars
configClass.addImportBeanDefinitionRegistrar(registrar, currentSourceClass.getMetadata());
}
// 3. 作为ConfigClass,继续处理
else {
// Candidate class not an ImportSelector or ImportBeanDefinitionRegistrar ->
// process it as an @Configuration class
this.importStack.registerImport(
currentSourceClass.getMetadata(), candidate.getMetadata().getClassName());
processConfigurationClass(candidate.asConfigClass(configClass), exclusionFilter);
}
}
} catch (BeanDefinitionStoreException ex) {
throw ex;
} catch (Throwable ex) {
throw new BeanDefinitionStoreException(
"Failed to process import candidates for configuration class [" +
configClass.getMetadata().getClassName() + "]", ex);
} finally {
this.importStack.pop();
}
}
}
}
1. AutoProxyRegistrar
public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
// 1. InfrastructureAdvisorAutoProxyCreator
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
}
}
// AnnotationConfigUtils注入的internal类
1. internalConfigurationAnnotationProcessor => ConfigurationClassPostProcessor
2. internalAutowiredAnnotationProcessor => AutowiredAnnotationBeanPostProcessor
3. internalCommonAnnotationProcessor => CommonAnnotationBeanPostProcessor
4. internalEventListenerProcessor => EventListenerMethodProcessor
5. internalEventListenerFactory => DefaultEventListenerFactory
// AopConfigUtils注入的internal类
6. internalAutoProxyCreator =>
1. AspectJAwareAdvisorAutoProxyCreator => xml
2. AnnotationAwareAspectJAutoProxyCreator => <aop:aspectj-autoproxy/>, @EnableAspectJAutoProxy
3. InfrastructureAdvisorAutoProxyCreator => anno
2. ProxyTrxManagementCfgt
xml | anno |
---|---|
AspectJExpressionPointCut | TransactionAttributeSourcePointcut |
DefaultBeanFactoryPointcutAdvisor | BeanFactoryTransactionAttributeSourceAdvisor |
TransactionInterceptor | TransactionInterceptor |
NameMatchTransactionAttributeSource | AnnotationTransactionAttributeSource |
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
/*
* 1. internalTransactionAdvisor => BeanFactoryTransactionAttributeSourceAdvisor
*
* @param transactionAttributeSource <= 2. @Bean注入
* @param transactionInterceptor <= 3. @Bean注入
*/
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(transactionAttributeSource);
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
// 2. transactionAttributeSource => AnnotationTransactionAttributeSource
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
// 3. transactionInterceptor => TransactionInterceptor
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
1. BFTrxAttributeSourceAdvisor
/**
* 事务属性通知器,存放事务注解的方法相关的属性
*
* Advisor driven by a {@link TransactionAttributeSource}, used to include
* a transaction advice bean for methods that are transactional.
*
* @author Juergen Hoeller
* @since 2.5.5
* @see #setAdviceBeanName
* @see TransactionInterceptor
* @see TransactionAttributeSourceAdvisor
*/
@SuppressWarnings("serial")
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {
@Nullable
private TransactionAttributeSource transactionAttributeSource;
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
@Override
@Nullable
protected TransactionAttributeSource getTransactionAttributeSource() {
return transactionAttributeSource;
}
};
/**
* Set the {@link ClassFilter} to use for this pointcut.
* Default is {@link ClassFilter#TRUE}.
*/
public void setClassFilter(ClassFilter classFilter) {
this.pointcut.setClassFilter(classFilter);
}
}
1. TrxAttributeSourcePointcut
/**
* Abstract class that implements a Pointcut that matches if the underlying
* {@link TransactionAttributeSource} has an attribute for a given method.
*
* @author Juergen Hoeller
* @since 2.5.5
*/
@SuppressWarnings("serial")
abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {
protected TransactionAttributeSourcePointcut() {
setClassFilter(new TransactionAttributeSourceClassFilter());
}
@Override
public boolean matches(Method method, Class<?> targetClass) {
/**
* 获取我们@EnableTransactionManagement注解为我们容器中导入的ProxyTransactionManagementConfiguration
* 配置类中的TransactionAttributeSource对象
*/
TransactionAttributeSource tas = getTransactionAttributeSource();
// 1... AbstractFallbackTransactionAttributeSource 若事务属性原为null或者解析出来的事务注解属性不为空,表示方法匹配
return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}
/**
* Obtain the underlying TransactionAttributeSource (may be {@code null}).
* To be implemented by subclasses.
*/
@Nullable
protected abstract TransactionAttributeSource getTransactionAttributeSource();
/**
* {@link ClassFilter} that delegates to {@link TransactionAttributeSource#isCandidateClass}
* for filtering classes whose methods are not worth searching to begin with.
*/
private class TransactionAttributeSourceClassFilter implements ClassFilter {
@Override
public boolean matches(Class<?> clazz) {
if (TransactionalProxy.class.isAssignableFrom(clazz) ||
TransactionManager.class.isAssignableFrom(clazz) ||
PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
return false;
}
TransactionAttributeSource tas = getTransactionAttributeSource();
return (tas == null || tas.isCandidateClass(clazz));
}
}
}
2. AnnoTrxAttributeSource
@SuppressWarnings("serial")
public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource
implements Serializable {
private static final boolean jta12Present;
private static final boolean ejb3Present;
static {
ClassLoader classLoader = AnnotationTransactionAttributeSource.class.getClassLoader();
jta12Present = ClassUtils.isPresent("javax.transaction.Transactional", classLoader);
ejb3Present = ClassUtils.isPresent("javax.ejb.TransactionAttribute", classLoader);
}
private final boolean publicMethodsOnly;
private final Set<TransactionAnnotationParser> annotationParsers;
/**
* Create a default AnnotationTransactionAttributeSource, supporting
* public methods that carry the {@code Transactional} annotation
* or the EJB3 {@link javax.ejb.TransactionAttribute} annotation.
*/
public AnnotationTransactionAttributeSource() {
this(true);
}
/**
* Create a custom AnnotationTransactionAttributeSource, supporting
* public methods that carry the {@code Transactional} annotation
* or the EJB3 {@link javax.ejb.TransactionAttribute} annotation.
* @param publicMethodsOnly whether to support public methods that carry
* the {@code Transactional} annotation only (typically for use
* with proxy-based AOP), or protected/private methods as well
* (typically used with AspectJ class weaving)
*/
public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
this.publicMethodsOnly = publicMethodsOnly;
if (jta12Present || ejb3Present) {
this.annotationParsers = new LinkedHashSet<>(4);
this.annotationParsers.add(new SpringTransactionAnnotationParser());
if (jta12Present) {
this.annotationParsers.add(new JtaTransactionAnnotationParser());
}
if (ejb3Present) {
this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
}
}
else {
this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
}
}
@Override
public boolean isCandidateClass(Class<?> targetClass) {
for (TransactionAnnotationParser parser : this.annotationParsers) {
if (parser.isCandidateClass(targetClass)) {
return true;
}
}
return false;
}
/**
* 查找Transactional注解的属性,而且会对父类和接口也寻找,
* @param clazz the class to retrieve the attribute for
* @return
*/
@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Class<?> clazz) {
return determineTransactionAttribute(clazz);
}
@Override
@Nullable
protected TransactionAttribute findTransactionAttribute(Method method) {
return determineTransactionAttribute(method);
}
/**
* Determine the transaction attribute for the given method or class.
* <p>This implementation delegates to configured
* {@link TransactionAnnotationParser TransactionAnnotationParsers}
* for parsing known annotations into Spring's metadata attribute class.
* Returns {@code null} if it's not transactional.
* <p>Can be overridden to support custom annotations that carry transaction metadata.
* @param element the annotated method or class
* @return the configured transaction attribute, or {@code null} if none was found
*/
@Nullable
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
// 获取我们的注解解析器
for (TransactionAnnotationParser parser : this.annotationParsers) {
// 通过注解解析器去解析我们的元素(方法或者类)上的注解
TransactionAttribute attr = parser.parseTransactionAnnotation(element);
if (attr != null) {
return attr;
}
}
return null;
}
}
1. AbsFallbackTrxAttributeSource
public abstract class AbstractFallbackTransactionAttributeSource implements TransactionAttributeSource {
private final Map<Object, TransactionAttribute> attributeCache = new ConcurrentHashMap<>(1024);
/**
* 先进行缓存的获取,如果没有缓存就进行事务属性的获取,如果获取到就放入缓存并返回,否则就返回null
* <p>
* Determine the transaction attribute for this method invocation.
* <p>Defaults to the class's transaction attribute if no method attribute is found.
*
* @param method the method for the current invocation (never {@code null})
* @param targetClass the target class for this invocation (may be {@code null})
* @return a TransactionAttribute for this method, or {@code null} if the method
* is not transactional
*/
@Override
@Nullable
public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// 判断method所在的class是不是Object类型
if (method.getDeclaringClass() == Object.class) {
return null;
}
// First, see if we have a cached value.
// 构建缓存key
Object cacheKey = getCacheKey(method, targetClass);
// 从缓存中获取
TransactionAttribute cached = this.attributeCache.get(cacheKey);
// 有缓存,不会每次computeTransactionAttribute
if (cached != null) {
// Value will either be canonical value indicating there is no transaction attribute,
// or an actual transaction attribute.
// 判断缓存中的对象是不是空事务属性的对象
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
return null;
} else {
// 存在就直接返回事务属性
return cached;
}
} else {
// We need to work it out.
// 1.. 查找我们的事务注解
TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
// Put it in the cache.
// 若解析出来的事务注解属性为空
if (txAttr == null) {
// 往缓存中存放空事务注解属性
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
} else {
// 我们执行方法的描述符:包名+类名+方法名
String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
// 把方法描述设置到事务属性上去
if (txAttr instanceof DefaultTransactionAttribute) {
((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
}
if (logger.isTraceEnabled()) {
logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
}
// 加入缓存
this.attributeCache.put(cacheKey, txAttr);
}
return txAttr;
}
}
/**
* 获取事务属性的核心方法
* <p>
* Same signature as {@link #getTransactionAttribute}, but doesn't cache the result.
* {@link #getTransactionAttribute} is effectively a caching decorator for this method.
* <p>As of 4.1.8, this method can be overridden.
*
* @see #getTransactionAttribute
* @since 4.1.8
*/
@Nullable
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow no-public methods as required.
// 首先判断方法是否是public,默认是支持public的
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// The method may be on an interface, but we need attributes from the target class.
// If the target class is null, the method will be unchanged.
// method代表接口中的方法,specificMethod代表实现类中的方法
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// First try is the method in the target class.
// 1... AnnotationTransactionAttributeSource 优先方法上解析的事务注解的属性,会去找父类或者接口的方法
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// Second try is the transaction attribute on the target class.
// 如果没有,再尝试生命该方法的类山搞得注解属性,会去父类或者接口找
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
// 如果指定方法不等于方法
if (specificMethod != method) {
// Fallback is to look at the original method.
// 查找接口方法
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
// 到接口中的类中去寻找
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
}
2. SpringTrxAnnoParser
/**
* Strategy implementation for parsing Spring's {@link Transactional} annotation.
*
* @author Juergen Hoeller
* @since 2.5
*/
@SuppressWarnings("serial")
public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {
@Override
public boolean isCandidateClass(Class<?> targetClass) {
return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
}
@Override
@Nullable
public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
// 从element对象中获取@Transactional注解,然后把注解属性封装到了AnnotationAttributes
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
element, Transactional.class, false, false);
if (attributes != null) {
// 解析出真正的事务属性对象
return parseTransactionAnnotation(attributes);
}
else {
return null;
}
}
public TransactionAttribute parseTransactionAnnotation(Transactional ann) {
return parseTransactionAnnotation(AnnotationUtils.getAnnotationAttributes(ann, false, false));
}
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
// 创建一个基础规则的事务属性对象
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
// 解析@Transactionl上的传播行为
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
// 解析@Transactionl上的隔离级别
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
// 解析@Transactionl上的事务超时事件
rbta.setTimeout(attributes.getNumber("timeout").intValue());
// 解析readOnly
rbta.setReadOnly(attributes.getBoolean("readOnly"));
// 解析@Transactionl上的事务管理器的名称
rbta.setQualifier(attributes.getString("value"));
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
// 解析针对哪种异常回滚
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
// 对哪种异常进行回滚
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
// 对哪种异常不回滚
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
// 对哪种类型不回滚
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
}
3. enhanceConfigClasses()
- ConfigClass_full类都会被AOP代理
invokeBDRPP()
进行ConfigClass的解析、组装invokeBFPP()
进行ConfigClass的AOP
ConfigurationClassPostProcessor#enhanceConfigurationClasses()
public class ConfigurationClassPostProcessor implements BeanDefinitionRegistryPostProcessor,
PriorityOrdered, ResourceLoaderAware, BeanClassLoaderAware, EnvironmentAware {
/**
* 定位、加载、解析、注册相关注解
* <p>
* Derive further bean definitions from the configuration classes in the registry.
*/
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) {
// registry生成hashCode,只操作一次,如果之前处理过则抛出异常
int registryId = System.identityHashCode(registry);
if (this.registriesPostProcessed.contains(registryId)) {
throw new IllegalStateException(
"postProcessBeanDefinitionRegistry already called on this post-processor against " + registry);
}
if (this.factoriesPostProcessed.contains(registryId)) {
throw new IllegalStateException(
"postProcessBeanFactory already called on this post-processor against " + registry);
}
this.registriesPostProcessed.add(registryId);
// 处理config_BD
processConfigBeanDefinitions(registry);
}
// ------------------------------------------------------------------------------------------
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
int factoryId = System.identityHashCode(beanFactory);
if (this.factoriesPostProcessed.contains(factoryId)) {
throw new IllegalStateException(
"postProcessBeanFactory already called on this post-processor against " + beanFactory);
}
this.factoriesPostProcessed.add(factoryId);
if (!this.registriesPostProcessed.contains(factoryId)) {
// BeanDefinitionRegistryPostProcessor hook apparently not supported...
// Simply call processConfigurationClasses lazily at this point then.
processConfigBeanDefinitions((BeanDefinitionRegistry) beanFactory);
}
// 1.. configClass进行proxy
enhanceConfigurationClasses(beanFactory);
beanFactory.addBeanPostProcessor(new ImportAwareBeanPostProcessor(beanFactory));
}
/**
* Post-processes a BeanFactory in search of Configuration class BeanDefinitions;
* any candidates are then enhanced by a {@link ConfigurationClassEnhancer}.
* Candidate status is determined by BeanDefinition attribute metadata.
*
* @see ConfigurationClassEnhancer
*/
public void enhanceConfigurationClasses(ConfigurableListableBeanFactory beanFactory) {
Map<String, AbstractBeanDefinition> configBeanDefs = new LinkedHashMap<>();
// 1. 遍历所有bean
for (String beanName : beanFactory.getBeanDefinitionNames()) {
BeanDefinition beanDef = beanFactory.getBeanDefinition(beanName);
Object configClassAttr = beanDef.getAttribute(ConfigurationClassUtils.CONFIGURATION_CLASS_ATTRIBUTE);
MethodMetadata methodMetadata = null;
if (beanDef instanceof AnnotatedBeanDefinition) {
methodMetadata = ((AnnotatedBeanDefinition) beanDef).getFactoryMethodMetadata();
}
if ((configClassAttr != null || methodMetadata != null) && beanDef instanceof AbstractBeanDefinition) {
// Configuration class (full or lite) or a configuration-derived @Bean method
// -> resolve bean class at this point...
AbstractBeanDefinition abd = (AbstractBeanDefinition) beanDef;
if (!abd.hasBeanClass()) {
try {
abd.resolveBeanClass(this.beanClassLoader);
} catch (Throwable ex) {
throw new IllegalStateException(
"Cannot load configuration class: " + beanDef.getBeanClassName(), ex);
}
}
}
// 2. bean属性full
if (ConfigurationClassUtils.CONFIGURATION_CLASS_FULL.equals(configClassAttr)) {
if (!(beanDef instanceof AbstractBeanDefinition)) {
throw new BeanDefinitionStoreException("Cannot enhance @Configuration bean definition '" +
beanName + "' since it is not stored in an AbstractBeanDefinition subclass");
} else if (logger.isInfoEnabled() && beanFactory.containsSingleton(beanName)) {
logger.info("Cannot enhance @Configuration bean definition '" + beanName +
"' since its singleton instance has been created too early. The typical cause " +
"is a non-static @Bean method with a BeanDefinitionRegistryPostProcessor " +
"return type: Consider declaring such methods as 'static'.");
}
configBeanDefs.put(beanName, (AbstractBeanDefinition) beanDef);
}
}
if (configBeanDefs.isEmpty()) {
// nothing to enhance -> return immediately
return;
}
// 3.
ConfigurationClassEnhancer enhancer = new ConfigurationClassEnhancer();
for (Map.Entry<String, AbstractBeanDefinition> entry : configBeanDefs.entrySet()) {
AbstractBeanDefinition beanDef = entry.getValue();
// If a @Configuration class gets proxied, always proxy the target class
beanDef.setAttribute(AutoProxyUtils.PRESERVE_TARGET_CLASS_ATTRIBUTE, Boolean.TRUE);
// Set enhanced subclass of the user-specified bean class
Class<?> configClass = beanDef.getBeanClass();
// 4.
Class<?> enhancedClass = enhancer.enhance(configClass, this.beanClassLoader);
if (configClass != enhancedClass) {
if (logger.isTraceEnabled()) {
logger.trace(String.format("Replacing bean definition '%s' existing class '%s' with " +
"enhanced class '%s'", entry.getKey(), configClass.getName(), enhancedClass.getName()));
}
beanDef.setBeanClass(enhancedClass);
}
}
}
}
@Bean
方法a()
调用了b()
。生成代理对象,当调用b()
时,代理类判断b是否第一次被new- 如果是,则代理对象调用
proxyMethod.invokeSuper()
调用父类的当前方法进行创建b - 否则根据代理类的
$$BeanFactory.getBean()
去获取b对象,b只被创建一次,即为singleton
- 如果是,则代理对象调用
@Configuration
public class ConfigClass {
@Bean
public A a() {
b()
return new A();
}
@Bean
public B b() {
return new b();
}
}
4. txDao.updDao()
TxService
,TxDao
皆为代理对象
<tx:attributes>
<!-- REQUIRED, REQUIRES_NEW, MANDATORY, NESTED, NEVER, NOT_SUPPORTED, SUPPORTS -->
<tx:method name="updService" isolation="DEFAULT" propagation="REQUIRED"/>
<tx:method name="updDao" isolation="DEFAULT" propagation="REQUIRED"/>
</tx:attributes>
1. CglibAopProxy
class CglibAopProxy implements AopProxy, Serializable {
/**
* General purpose AOP callback. Used when the target is dynamic or when the
* proxy is not frozen.
*/
private static class DynamicAdvisedInterceptor implements MethodInterceptor, Serializable {
private final AdvisedSupport advised;
public DynamicAdvisedInterceptor(AdvisedSupport advised) {
this.advised = advised;
}
/**
* @param proxy 代理对象
* @param method 被代理方法
* @param args 入参
* @param methodProxy 代理方法
*/
@Override
@Nullable
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;
Object target = null;
// 1. MyCalc的包装对象
TargetSource targetSource = this.advised.getTargetSource();
try {
// 2. 两方法相互调用,是否都AOP
if (this.advised.exposeProxy) {
// Make invocation available if necessary.
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
// Get as late as possible to minimize the time we "own" the target, in case it comes from a pool...
target = targetSource.getTarget(); // MyCalc
Class<?> targetClass = (target != null ? target.getClass() : null);
// 3. 从advised(ProxyFactory)中获取配置好的AOP通知
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
Object retVal;
// Check whether we only have one InvokerInterceptor: that is,
// no real advice, but just reflective invocation of the target.
// chain为空则直接调用原方法
if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
// We can skip creating a MethodInvocation: just invoke the target directly.
// Note that the final invoker must be an InvokerInterceptor, so we know
// it does nothing but a reflective operation on the target, and no hot
// swapping or fancy proxying.
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = methodProxy.invoke(target, argsToUse);
} else {
// 4.. CglibMethodInvocation We need to create a method invocation...
retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
}
// 5. 处理返回值类型
retVal = processReturnType(proxy, target, method, retVal);
return retVal;
} finally {
if (target != null && !targetSource.isStatic()) {
targetSource.releaseTarget(target);
}
if (setProxyContext) {
// Restore old proxy.
AopContext.setCurrentProxy(oldProxy);
}
}
}
}
// -----------------------------------------------------------------------------
/**
* Implementation of AOP Alliance MethodInvocation used by this AOP proxy.
*/
private static class CglibMethodInvocation extends ReflectiveMethodInvocation {
@Nullable
private final MethodProxy methodProxy;
@Override
@Nullable
public Object proceed() throws Throwable {
try {
// 1... ReflectiveMethodInvocation
return super.proceed();
} catch (RuntimeException ex) {
throw ex;
} catch (Exception ex) {
if (ReflectionUtils.declaresException(getMethod(), ex.getClass())) {
throw ex;
} else {
throw new UndeclaredThrowableException(ex);
}
}
}
/**
* Gives a marginal performance improvement versus using reflection to
* invoke the target when invoking public methods.
*/
@Override
protected Object invokeJoinpoint() throws Throwable {
if (this.methodProxy != null) {
// 1. 调用被代理对象方法
return this.methodProxy.invoke(this.target, this.arguments);
} else {
return super.invokeJoinpoint();
}
}
}
}
1. ReflectiveMethodInvocation
public class ReflectiveMethodInvocation implements ProxyMethodInvocation, Cloneable {
protected final Object proxy;
private final Class<?> targetClass;
protected final Method method;
protected final Object target; // 原对象
protected Object[] arguments; // 实参
// chain
protected final List<?> interceptorsAndDynamicMethodMatchers;
private int currentInterceptorIndex = -1;
protected ReflectiveMethodInvocation(
Object proxy, @Nullable Object target, Method method, @Nullable Object[] arguments,
@Nullable Class<?> targetClass, List<Object> interceptorsAndDynamicMethodMatchers) {
this.proxy = proxy; // 代理对象
this.target = target; // 被代理对象
this.targetClass = targetClass; // 被代理Class
this.method = BridgeMethodResolver.findBridgedMethod(method); // 被代理方法
this.arguments = AopProxyUtils.adaptArgumentsIfNecessary(method, arguments); // 实参
this.interceptorsAndDynamicMethodMatchers = interceptorsAndDynamicMethodMatchers; // chain
}
/**
* 递归获取通知,执行
*/
@Override
@Nullable
public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
// 1... CglibMethodInvocation 反射调用target_method
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
// 2. chain下一个node
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
return dm.interceptor.invoke(this);
}
// 不匹配,那么proceed会被递归调用,直到所有的拦截器都被运行过
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
return proceed();
}
}
// 3... TransactionInterceptor. 普通拦截器,直接调用拦截器,(this => CglibMethodInvocation)作为参数传递
else {
// It's an interceptor, so we just invoke it: The pointcut will have
// been evaluated statically before this object was constructed.
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}
/**
* Invoke the joinpoint using reflection.
* Subclasses can override this to use custom invocation.
*
* @return the return value of the joinpoint
* @throws Throwable if invoking the joinpoint resulted in an exception
*/
@Nullable
protected Object invokeJoinpoint() throws Throwable {
// this.target 目标对象
// this.method 目标方法
// this.arguments 目标方法参数信息
return AopUtils.invokeJoinpointUsingReflection(this.target, this.method, this.arguments);
}
}
2. TransactionInterceptor
invocation::proceed
传入方法
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
public Object invoke(MethodInvocation invocation) throws Throwable {
// 代理对象Class
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 1... TransactionAspectSupport. Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
determineTransactionManager()
=> DataSourceTransactionManagercreateTransactionIfNecessary()
=> TransactionAspectSupport$TransactionInfotm.getTransaction(txAttr);
prepareTransactionInfo();
=>new TransactionInfo();
invocation.proceedWithInvocation();
=> 业务方法执行commitTransactionAfterReturning(txInfo);
=> 提交TxtxInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
completeTransactionAfterThrowing()
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
1. TransactionAspectSupport
147 status = tm.getTransaction(txAttr); => AbstractPlatformTransactionManager#getTransaction()
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
// NameMatchTransactionAttributeSource
@Nullable
private TransactionAttributeSource transactionAttributeSource;
// DataSourceTransactionManager
@Nullable
private TransactionManager transactionManager;
@Nullable
public TransactionAttributeSource getTransactionAttributeSource() {
return this.transactionAttributeSource;
}
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
// 1.. 事务属性源对象 (NameMatchTransactionAttributeSource, AnnotationTransactionAttributeSource)
TransactionAttributeSource tas = getTransactionAttributeSource();
// 2. 当前方法的事务属性信息 (PROPAGATION_REQUIRED, ISOLATION_DEFAULT)
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 3.. **xml配置的事务管理器,DataSourceTransactionManager**
final TransactionManager tm = determineTransactionManager(txAttr);
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
throw new TransactionUsageException(
"Unsupported annotated transaction on suspending function detected: " + method +
". Use TransactionalOperator.transactional extensions instead.");
}
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
method.getReturnType());
}
return new ReactiveTransactionSupport(adapter);
});
return txSupport.invokeWithinTransaction(
method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
// 连接点的唯一标识。(类名 + 方法名) com.listao.tx.TxService.updService
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 声明式事务处理
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 4.. 创建TxInfo `TransactionAspectSupport$TransactionInfo`
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 5. CglibMethodInvocation. 回调chain,执行被代理的业务方法
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// target invocation exception
// 6.. 异常事务处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
// 7.. 清除事务信息,恢复线程oldTransactionInfo
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 8.. Tx_commit => 1. 资源存储 2. 连接释放 3. 恢复挂起
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 编程式事务处理
else {
// ...
}
}
// ------------------------------------------------------------------------------------------------
/**
* Determine the specific transaction manager to use for the given transaction.
*/
@Nullable
protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
if (txAttr == null || this.beanFactory == null) {
return getTransactionManager();
}
String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
} else if (StringUtils.hasText(this.transactionManagerBeanName)) {
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
} else {
// 1. xml => DataSourceTransactionManager
TransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) {
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
// 2. anno
defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}
// ------------------------------------------------------------------------------------------------
// 创建TxInfo
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr,
final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
// 1. com.listao.tx.TxDao.updDao
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 2... Tx_Status => AbstractPlatformTransactionManager#getTransaction()
status = tm.getTransaction(txAttr);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 3.. 指定属性 + Tx_Status => Tx_Info
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
// 1. 创建事务信息
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// We need a transaction for this method...
if (logger.isTraceEnabled()) {
logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// The transaction manager will flag an error if an incompatible tx already exists.
// 2. 设置新事务状态
txInfo.newTransactionStatus(status);
} else {
// The TransactionInfo.hasTransaction() method will return false. We created it only
// to preserve the integrity of the ThreadLocal stack maintained in this class.
if (logger.isTraceEnabled()) {
logger.trace("No need to create transaction for [" + joinpointIdentification +
"]: This method is not transactional.");
}
}
// We always bind the TransactionInfo to the thread, even if we didn't create
// a new transaction here. This guarantees that the TransactionInfo stack
// will be managed correctly even if no transaction was created by this aspect.
// 3. 事务信息绑定到当前线程
txInfo.bindToThread();
return txInfo;
}
// ------------------------------------------------------------------------------------------------
/**
* Handle a throwable, completing the transaction.
* We may commit or roll back, depending on the configuration.
*
* @param txInfo information about the current transaction
* @param ex throwable encountered
*/
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
"] after exception: " + ex);
}
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 1... 进行回滚
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by rollback exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by rollback exception", ex);
throw ex2;
}
} else {
// 2... We don't roll back on this exception.
// Will still roll back if TransactionStatus.isRollbackOnly() is true.
try {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
} catch (TransactionSystemException ex2) {
logger.error("Application exception overridden by commit exception", ex);
ex2.initApplicationException(ex);
throw ex2;
} catch (RuntimeException | Error ex2) {
logger.error("Application exception overridden by commit exception", ex);
throw ex2;
}
}
}
}
protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
if (txInfo != null) {
// 1.
txInfo.restoreThreadLocalStatus();
}
}
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 1... AbstractPlatformTransactionManager
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
// ------------------------------------------------------------------------------------
/**
* Simple callback interface for proceeding with the target invocation.
* Concrete interceptors/aspects adapt this to their invocation mechanism.
*/
@FunctionalInterface
protected interface InvocationCallback {
@Nullable
Object proceedWithInvocation() throws Throwable;
}
// ------------------------------------------------------------------------------------
protected static final class TransactionInfo {
@Nullable
private final PlatformTransactionManager transactionManager;
@Nullable
private final TransactionAttribute transactionAttribute;
private final String joinpointIdentification;
@Nullable
private TransactionStatus transactionStatus;
@Nullable
private TransactionInfo oldTransactionInfo;
}
}
2. DataSourceTrxManager
- TxService_Tx创建。关闭自动提交,自动开启事务
doGetTransaction()
isExistingTransaction()
doBegin()
doCommit()
doRollback()
doResume()
doSuspend()
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
@Nullable
private DataSource dataSource;
@Override
protected Object doGetTransaction() {
// 1. 数据源事务对象
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 保存点设置,嵌套事务下允许使用
txObject.setSavepointAllowed(isNestedTransactionAllowed());
/*
* - 事务同步管理器(属性为局部线程变量),保存当前事务信息
* - 第一次从线程变量中获取,事务连接持有器对象,通过数据源key去获取,ConnectionHolder为null
*/
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// 2. 非新创建连接
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 1. 若第一次进来开始事务,txObject.hasConnectionHolder()返回null,表示不存在事务
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
// --------------------------------------------------------------------------------------------------
/**
* 开启连接和事务
*
* @param transaction the transaction object returned by {@code doGetTransaction}
* @param definition a TransactionDefinition instance, describing propagation
* behavior, isolation level, read-only flag, timeout, and transaction name
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
// 1. 强制转化事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 判断事务对象没有数据库连接持有器
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// Druid数据源获取一个数据库Connection
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 数据库连接包装为ConnectionHolder,设置到txObject
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 标记当前连接是一个同步事务
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 2. 设置隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 3. 设置是否只读
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 4. 关闭DB连接自动提交,commit由Spring_tx管理
if (con.getAutoCommit()) {
// 设置必须恢复自动提交
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// 判断是否设置为只读事务
prepareTransactionalConnection(con, definition);
// 标记激活事务
txObject.getConnectionHolder().setTransactionActive(true);
// 5. 事务超时时间
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
// (key: DruidDataSource, value: ConnectionHolder)绑定到同步管理器,ThreadLocal<Map<Object, Object>>
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
// 释放数据库连接
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
// --------------------------------------------------------------------------------------------------
/**
* JDBC_conn进行commit
*
* @param status the status representation of the transaction
*/
@Override
protected void doCommit(DefaultTransactionStatus status) {
// 1.
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
// 2. JDBC_conn commit
con.commit();
} catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
// --------------------------------------------------------------------------------------------------
/**
* jdbc_rollback
*
* @param status the status representation of the transaction
*/
@Override
protected void doRollback(DefaultTransactionStatus status) {
// 1.
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
// 2. jdbc_rollback
con.rollback();
} catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
// --------------------------------------------------------------------------------------------------
/**
* 将挂起的事务连接持有器和数据源绑定,放入线程私有变量中
*
* @param transaction the transaction object returned by {@code doGetTransaction}
* @param suspendedResources the object that holds suspended resources,
* as returned by doSuspend
*/
@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
// --------------------------------------------------------------------------------------------------
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 1. 清空连接持有器
txObject.setConnectionHolder(null);
// 2. 解绑线程私有的资源
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
// --------------------------------------------------------------------------------------------------
@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
"] rollback-only");
}
// 1. rollback标志`ResourceHolderSupport.rollbackOnly = true;`
txObject.setRollbackOnly();
}
}
DataSourceTransactionManager#doBegin()
1. AbsPlatformTrxManager
getTransaction()
- ->
doGetTransaction()
- ->
isExistingTransaction()
handleExistingTransaction()
startTransaction()
- ->
doBegin()
- ->
- ->
commit()
processCommit()
- ->
doCommit()
- ->
processRollback()
- ->
doRollback()
- ->
resume()
- ->
doResume()
- ->
suspend()
- ->
doSuspend()
- ->
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
/**
* This implementation handles propagation behavior. Delegates to
* {@code doGetTransaction}, {@code isExistingTransaction}
* and {@code doBegin}.
*
* @see #doGetTransaction
* @see #isExistingTransaction
* @see #doBegin
*/
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 1... DataSourceTransactionManager -> 获取事务
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 2... DataSourceTransactionManager -> curr_Thread存在Tx,(连接不为空 && conn.transactionActive不为空)
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// 3..
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
// 4. Tx超时验证
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// (MANDATORY)
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 5.1. new_Tx (REQUIRED, REQUIRED_NEW, NESTED)
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 5.2. 没有curr_Tx,挂起空事务
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 5.3..
return startTransaction(def, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error ex) {
// 5.4.. 恢复挂起的事务
resume(null, suspendedResources);
throw ex;
}
}
// 6.1. null_Tx (SUPPORTS, NOT_SUPPORTS, NEVER)
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
// 创建空Tx
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 6.2..
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
/**
* Create a TransactionStatus for an existing transaction.
*/
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 1. NEVER,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 2.1. NOT_SUPPORTS,挂起外层事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 2.2.. 挂起当前事务
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 2.3.. 创建一个新的非事务状态(保存了上一个存在事务状态的属性)
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 3.1. REQUIRED_NEW,挂起curr_Tx,开启新Tx
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 3.2..
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 3.3.. 创建新Tx_Status,保存SuspendedResourcesHolder(挂起资源)
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
} catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 4.1. NESTED,嵌套事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 不允许就报异常
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
// 4.2..
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 4.3. Tx_Savepoint
status.createAndHoldSavepoint();
return status;
} else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
// 4.4.. 有些情况是不能使用保存点操作
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 验证curr_Tx,以此事务执行下去
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
// 5.
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
// --------------------------------------------------------------------------------------------------
/**
* Start a new transaction.
*/
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
// 需要新同步 true
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 1.. 创建新Tx_status
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 2... DataSourceTransactionManager -> 获取Connection。关闭AutoCommit,即立即开启Tx
doBegin(transaction, definition);
// TransactionSynchronizationManager事务同步管理器,属性设置
prepareSynchronization(status, definition);
return status;
}
protected DefaultTransactionStatus newTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 是否需要新同步,只要有新同步且当前无同步激活事务
boolean actualNewSynchronization = newSynchronization &&
!TransactionSynchronizationManager.isSynchronizationActive();
// 1.
return new DefaultTransactionStatus(
transaction, newTransaction, actualNewSynchronization,
definition.isReadOnly(), debug, suspendedResources);
}
// --------------------------------------------------------------------------------------------------
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 事务链被标记回滚,直接回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 1.. 非预期回滚
processRollback(defStatus, false);
return;
}
// 全局回滚 && `ResourceHolderSupport.rollbackOnly`
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 2.. 非预期回滚,可能会报异常
processRollback(defStatus, true);
return;
}
// 3.. 处理事务提交
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 预留
prepareForCommit(status);
// 添加的TransactionSynchronization中的对应方法的调用
triggerBeforeCommit(status);
// 提交完成前回调
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 1. 有保存点
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
// 是否有全局回滚标记
unexpectedRollback = status.isGlobalRollbackOnly();
// 如果存在保存点则清除保存点信息
status.releaseHeldSavepoint();
}
// 2. DefaultTransactionStatus.isNewTransaction()
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 3... DataSourceTransactionManager -> 独立事务
doCommit(status);
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
// 有全局回滚标记就报异常
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
} else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
// 提交过程中出现异常则回滚
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 提交后回调
triggerAfterCommit(status);
} finally {
// 提交后清除线程私有同步状态
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
// 4.. 数据清除、线程的私有资源解绑、重置连接自动提交、隔离级别,是否只读,释放连接,恢复挂起事务
cleanupAfterCompletion(status);
}
}
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
// 设置完成状态
status.setCompleted();
if (status.isNewSynchronization()) {
// 线程同步状态清除
TransactionSynchronizationManager.clear();
}
// 数据清除,线程私有资源解绑,重置连接自动提交,隔离级别,是否只读,释放连接...
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
// SuspendedResources恢复
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
// 1..
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
// --------------------------------------------------------------------------------------------------
protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
throws TransactionException {
// 设置属性和状态
if (resourcesHolder != null) {
// 1. 挂起资源恢复
Object suspendedResources = resourcesHolder.suspendedResources;
if (suspendedResources != null) {
// 2... DataSourceTransactionManager
doResume(transaction, suspendedResources);
}
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
// 如果有挂起同步器的话要设置线程私有变量的值为挂起事务的相关属性
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
doResumeSynchronization(suspendedSynchronizations);
}
}
}
// --------------------------------------------------------------------------------------------------
/**
* This implementation of rollback handles participating in existing
* transactions. Delegates to {@code doRollback} and
* {@code doSetRollbackOnly}.
*
* @see #doRollback
* @see #doSetRollbackOnly
*/
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 1..
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
// 意外的回滚
boolean unexpectedRollback = unexpected;
try {
// 回滚完成前回调
triggerBeforeCompletion(status);
// 2.1. savepoint
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// 2.2.
status.rollbackToHeldSavepoint();
}
// 1.1. new_Tx
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 1.2... DataSourceTransactionManager -> 进行回滚
doRollback(status);
} else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as " +
"rollback-only");
}
// 3.. 设置连接回滚标记,即全局回滚
doSetRollbackOnly(status);
} else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on " +
"rollback");
}
}
} else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 回滚完成后回调
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
// 4.
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
} finally {
// 根据事务状态信息,完成后数据清除,和线程的私有资源解绑,重置连接自动提交,隔离级别,是否只读,释放连接,恢复挂起事务等
cleanupAfterCompletion(status);
}
}
// --------------------------------------------------------------------------------------------------
@Nullable
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
// 判断当前的线程变量中有没有激活的事物,有需要清空线程变量
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
// 1... DataSourceTransactionManager 挂起的资源,连接持有器
suspendedResources = doSuspend(transaction);
}
// 获取当前事务名称
String name = TransactionSynchronizationManager.getCurrentTransactionName();
// 清空线程变量
TransactionSynchronizationManager.setCurrentTransactionName(null);
// 获取出只读事务的名称
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
// 清空线程变量
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
// 获取已存在事务的隔离级别
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
// 清空隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
// 判断当前事务激活状态
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
// 清空标记
TransactionSynchronizationManager.setActualTransactionActive(false);
// 把上诉从线程变量中获取出来的存在事务属性封装为挂起的事务属性返回出去
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
} else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
} else {
// Neither transaction nor synchronization active.
return null;
}
}
}
2. DefaultTransactionStatus
public class DefaultTransactionStatus extends AbstractTransactionStatus {
// 新创建事务
@Nullable
private final Object transaction;
// 是否需要新事物
private final boolean newTransaction;
// 是否需要新同步
private final boolean newSynchronization;
// 是否只读
private final boolean readOnly;
// 是否要debug
private final boolean debug;
// 是否有挂起的连接资源
@Nullable
private final Object suspendedResources;
}
3. createTrxIfNecessary
4. proceedWithInvocation()
// 5. 执行被代理的业务方法
retVal = invocation.proceedWithInvocation();
TxService#updService()
5. commitTrxAfterReturning()
// 8.1. Tx_commit => 1. 资源存储 2. 连接释放 3. 恢复挂起
commitTransactionAfterReturning(txInfo);
- TxDao跳过
commit()
,TxService进行commit()
DataSourceTransactionManager#doCommit()
5. propagation
传播属性 | 描述 |
---|---|
REQUIRED | 加入当前事务;如果没有,新建一个 |
SUPPORTS | 加入当前事务;如果没有,非事务运行 |
MANDATORY | 加入当前事务;如果没有,抛出异常 |
REQUIRED_NEW | 挂起当前事务;新建事务运行 |
NOT_SUPPORTS | 挂起当前事务;如果没有,非事务运行 |
NEVER | 当前事务,抛异常;如果没有,非事务运行 |
NESTED | 当前事务设置保存点,新建嵌套事务运行。如果没有,新建事务运行 |
1. REQUIRED-REQUIRED
- dao层,service层相互影响
<tx:attributes>
<!-- REQUIRED, REQUIRES_NEW, MANDATORY, NESTED, NEVER, NOT_SUPPORTED, SUPPORTS -->
<tx:method name="updService" propagation="REQUIRED"/>
<tx:method name="exceptionDao" propagation="REQUIRED"/>
</tx:attributes>
public void updService() {
txDao.updDao(1);
try {
txDao.exceptionDao(2);
} catch (Exception e) {
e.printStackTrace();
}
}
public void exceptionDao(int id) {
String sql = "update spring_tx set money=money-1 where id=?";
jdbcTemplate.update(sql, id);
// 抛异常
throw new ArithmeticException("/ by zero");
}
1. RollbackOnly
java.lang.ArithmeticException: / by zero
Exception in thread "main" org.springframework.transaction.UnexpectedRollbackException:
Transaction rolled back because it has been marked as rollback-only
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
- TxDao异常
DataSourceTransactionManager#doSetRollbackOnly()
- TxService真正rollback
DataSourceTransactionManager#doRollback()
AbstractPlatformTransactionManager#processRollback()
2. REQUIRED-NESTED
1. savepoint
set autocommit = 0;
UPDATE spring_tx SET money = money - 1 WHERE id = 1;
UPDATE spring_tx SET money = money - 1 WHERE id = 1;
UPDATE spring_tx SET money = money - 1 WHERE id = 1;
savepoint p;
UPDATE spring_tx SET money = money - 1 WHERE id = 2;
UPDATE spring_tx SET money = money - 1 WHERE id = 2;
UPDATE spring_tx SET money = money - 1 WHERE id = 2;
rollback to p;
commit;
# 只有前3条sql进行了commit
2. rollback
- dao层异常,不影响service层
<tx:attributes>
<!-- REQUIRED, REQUIRES_NEW, MANDATORY, NESTED, NEVER, NOT_SUPPORTED, SUPPORTS -->
<tx:method name="updService" propagation="REQUIRED"/>
<tx:method name="exceptionDao" propagation="NESTED"/>
</tx:attributes>
public void updService() {
txDao.updDao(1);
try {
txDao.exceptionDao(2);
} catch (Exception e) {
e.printStackTrace();
}
}
DruidPooledConnection#setSavepoint()
DruidPooledConnection#rollback()
3. REQUIRED-REQUIRES_NEW
- 外层Tx挂起,内层新Tx
<tx:attributes>
<tx:method name="exceptionService" propagation="REQUIRED"/>
<tx:method name="updDao" propagation="REQUIRES_NEW"/>
</tx:attributes>
public void exceptionService() {
txDao.updDao(1);
// 抛异常
throw new ArithmeticException("/ by zero");
}
AbstractPlatformTransactionManager#handleExistingTransaction()
AbstractPlatformTransactionManager#resume()