百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Spring事务管理(二)分布式事务管理之JTA与链式事务

lipiwang 2024-10-22 15:51 5 浏览 0 评论

什么是分布式事务

跨库的事务就属于分布式事务,比如对两个库的不同表同时修改和同时rollback等。

上一节中,我们只是演示了单个库(数据源)的事务处理。这一节主要讲如何处理多个数据源的事务。

为什么多数据源下不能使用普通事务来处理呢?

我想很多人都有这个问题,打个比方,分库分表后有个数据库A和数据库B,A中有抢票记录,B中有票数记录。当我们完成抢票功能,需要在B减少票数的同时在A中增加记录。但是如果有下面的代码发生:

@Transactional
public void multiDBTX(){
    B.reduce(ticketId);
    if (true){
        throw new RuntimeException("throw new exception");
    }
    A.save(result);
}

我在B扣除票数后抛出异常,然后执行A库添加记录。

如果没有分布式事务处理,则结果就是B票数扣除,但A没有保存记录。也就是出错后B并没有进行事务回滚。

那问题来了,怎么才能实现我们的要求呢。

分布式事务原则

CAP定理

web无法同时满足以下三点:

  1. 一致性: 所有数据变动都是同步的
  2. 可用性: 每个操作都必须有预期的响应
  3. 分区容错性: 出现单个节点无法可用,系统依然正常对外提供服务

BASE理论

BASE理论是对CAP中的一致性和可用性进行一个权衡的结果。核心思想是即使无法做到强一致性,但可以使用一些技术手段达到最终一致。

  1. Basically Available(基本可用):允许系统发生故障时,损失一部分可用性。
  2. Soft state(软状态):允许数据同步存在延迟。
  3. Eventually consistent(最终一致性): 不需要保持强一致性,最终一致即可。

那如何来实现分布式事务管理呢?

分布式事务管理实践

1. JTA实现

事务有效的屏蔽了底层事务资源,使应用可以以透明的方式参入到事务处理中,但是与本地事务相比,XA 协议的系统开销大

在这里我先带大家走出一个误解,你在网上搜JTA一般都是分布式事务用它,但是它就是用来做分布式事务的吗?不是的,我在上文说过,JTA只是Java实现XA事务的一个规范,我们在第一节Spring事务管理(一)快速入门中用到的事务,都可以叫JTA事务管理。下面主要说JTA实现分布式事务管理:

这里我们会用到Atomikos事务管理器,它是一个开源的事务管理器,实现了XA的一种分布式事务处理并可以嵌入到你的SpringBoot当中。

拓展:什么是XA

基本上所有的数据库都会支持XA事务,百度百科上说法:XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。简单的说,它是事务的标准,JTA也是它标准的java实现。

1.1 导入pom

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

1.2 设置数据源

SpringBoot设置多数据源这里只说下思路(重点还是说事务实现):

  1. 在application.yml中配置多数据源配置。
  2. 写配置类加载配置并放入DataSource并设置事务:
@Configuration
@DependsOn("transactionManager")
@EnableJpaRepositories(basePackages = "com.fantj.repository.user", entityManagerFactoryRef = "userEntityManager", transactionManagerRef = "transactionManager")
@EnableConfigurationProperties(UserDatasourceProperties.class)
public class UserConfig {

    @Autowired
    private JpaVendorAdapter jpaVendorAdapter;

    // 这里注入 dataSource信息的类 
    @Autowired
    private UserDatasourceProperties userDatasourceProperties;

    @Bean(name = "userDataSource")
    public DataSource userDataSource() {
        // 给XADataSource 设置 DataSource 属性
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        mysqlXaDataSource.setURL(userDatasourceProperties.getUrl());
        mysqlXaDataSource.setUser(userDatasourceProperties.getUser());
        mysqlXaDataSource.setPassword(userDatasourceProperties.getPassword());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        // 创建 Atomiko, 并将 mysql的XA交给JTA管理
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        // 设置唯一资源名
        xaDataSource.setUniqueResourceName("datasource2");
        return xaDataSource;
    }

    @Bean(name = "userEntityManager")
    @DependsOn("transactionManager")
    public LocalContainerEntityManagerFactoryBean userEntityManager() throws Throwable {

        HashMap<String, Object> properties = new HashMap<String, Object>();
        properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
        properties.put("javax.persistence.transactionType", "JTA");

        LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
        // 给工厂bean设置 资源加载属性
        entityManager.setJtaDataSource(userDataSource());
        entityManager.setJpaVendorAdapter(jpaVendorAdapter);
        entityManager.setPackagesToScan("com.fantj.pojo.user");
        entityManager.setPersistenceUnitName("userPersistenceUnit");
        entityManager.setJpaPropertyMap(properties);
        return entityManager;
    }

}

这只是一个数据源的配置,第二个数据源的配置也类似,注意不能同Entity同Repository,映射放在不同包下实现。
两个都返回LocalContainerEntityManagerFactoryBean它便会交给@Transaction去管理,两个数据源配置完后。这样的代码B将会回滚。

@Transactional
public void multiDBTX(){
    B.reduce(ticketId);
    if (true){
        throw new RuntimeException("throw new exception");
    }
    A.save(result);
}

1.3 JTA缺点

因为JTA采用两阶段提交方式,第一次是预备阶段,第二次才是正式提交。当第一次提交出现错误,则整个事务出现回滚,一个事务的时间可能会较长,因为它要跨越多个 数据库 多个数据资源的的操作,所以在性能上可能会造成吞吐量低。而且,它只能用在单个服务内。一个完善的JTA事务还需要同时考虑很多元素,这只是个示例。

2. 链式事务管理

链式事务就是声明一个ChainedTransactionManager 将所有的数据源事务按顺序放到该对象中,则事务会按相反的顺序来执行事务。

网上发现了一个链式事务管理的处理顺序,总结的很到位。

1.start message transaction
2.receive message
3.start database transaction
4.update database
5.commit database transaction
6.commit message transaction   ##当这一步出现错误时,上面的因为已经commit,所以不会rollback

可以看到,从345可以看到,它后拿到的事务先提交,这就导致如果1出错,则不会进行数据回滚。跟Spring的同步事务差不多,同步事务也是这种特性。

下面我会测试这个性质。

为了方便,我拿JdbcTemplate来测试该事务。

DBConfig.java

配置DataSource以及返回Template新实例和链式事务配置。

/**
 * DB配置类
 */
@Configuration
public class DBConfig {
    /**
     * user-DB配置
     */
    @Bean
    @Primary
    @ConfigurationProperties(prefix = "spring.datasource.user")
    public DataSourceProperties userDataSourceProperties(){
        return new DataSourceProperties();
    }

    @Bean
    @Primary
    public DataSource userDataSource(){
        return userDataSourceProperties().initializeDataSourceBuilder().type(HikariDataSource.class).build();
    }

    @Bean
    public JdbcTemplate userJdbcTemplate(@Qualifier("userDataSource") DataSource userDataSource){
        return new JdbcTemplate(userDataSource);
    }
    /**
     * result-DB配置
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource.result")
    public DataSourceProperties resultDataSourceProperties(){
        return new DataSourceProperties();
    }

    @Bean
    public DataSource resultDataSource(){
        return resultDataSourceProperties().initializeDataSourceBuilder().type(HikariDataSource.class).build();
    }

    @Bean
    public JdbcTemplate resultJdbcTemplate(@Qualifier("resultDataSource") DataSource resultDataSource){
        return new JdbcTemplate(resultDataSource);
    }
    /**
     * 链式事务配置
     */
    @Bean
    public PlatformTransactionManager transactionManager(){
        DataSourceTransactionManager userTM = new DataSourceTransactionManager(userDataSource());
        DataSourceTransactionManager resultTM = new DataSourceTransactionManager(resultDataSource());
        return new ChainedTransactionManager(userTM,resultTM);
    }
}

transactionManager()方法实现了链式事务配置,注意我放置的顺序先userTM后resultTM,所以事务应该是先拿到userTM然后拿到resultTM然后提交resultTM最后提交userTM,也就是说,如果我在提交user事务的时候出错,此时result相关的事务已经提交完成,所以result数据是不能回滚的。

2.1 测试

@RequestMapping("")
@Transactional
public void testTX(){
//        resultJdbcTemplate.execute("insert into result values(68,6,6)");
    userJdbcTemplate.execute("insert into user values (6,'FantJ',23,'男')");
    if (true){
        throw new RuntimeException("yes , throw one exception");
    }
    resultJdbcTemplate.execute("insert into result values(66,6,6)");
//        userJdbcTemplate.execute("insert into user values (8,'FantJ',23,'男')");
}

两个数据库没有内容。

控制台精简后的日志:

Creating new transaction with name [springbootjtamultidb.jtamulti.JtaMultiApplicationTests.testTX]: 
Creating new transaction with name [springbootjtamultidb.jtamulti.JtaMultiApplicationTests.testTX]: 
Began transaction (1) for test context [DefaultTestContext@1dde4cb2 testClass = ...
Executing SQL statement [insert into user values (6,'FantJ',23,'男')]
Initiating transaction rollback
Rolling back JDBC transaction on Connection [HikariProxyConnection@318550723 wrapping com.mysql.cj.jdbc.ConnectionImpl@57bd6a8f]
Releasing JDBC Connection [HikariProxyConnection@318550723 wrapping com.mysql.cj.jdbc.
Initiating transaction rollback
Rolling back JDBC transaction on Connection [HikariProxyConnection@1201991394 wrapping com.mysql.cj.jdbc.ConnectionImpl@36f6e521]
Releasing JDBC Connection [HikariProxyConnection@1201991394 wrapping com.mysql.cj.jdbc.ConnectionImpl@36f6e521] after transaction
Resuming suspended transaction after completion of inner transaction
Rolled back transaction for test: [DefaultTestContext@1dde4cb2 testClass = JtaMultiApplicationTests, testInstance = springbootjtamultidb.jtamulti.JtaMultiApplicationTests@441772e,


其实只要是两个dao操作中间出错或者第一个dao操作之前出错,事务都能正常回滚。如果result操作再前,user操作再后,user操作完抛出异常,也能回滚事务,原因上文有讲。

@RequestMapping("")
@Transactional
public void testTX(){
    resultJdbcTemplate.execute("insert into result values(68,6,6)");
//        userJdbcTemplate.execute("insert into user values (6,'FantJ',23,'男')");
//        if (true){
//            throw new RuntimeException("yes , throw one exception");
//        }
//        resultJdbcTemplate.execute("insert into result values(66,6,6)");
    userJdbcTemplate.execute("insert into user values (8,'FantJ',23,'男')");
    if (true){
        throw new RuntimeException("yes , throw one exception");
    }
}

这段代码也能正常回滚,结果我就不贴了。(浪费大家精力)

2.2 验证第二个事务不能回滚的情况

重要的事情再重复一遍:注意我放置的顺序先userTM后resultTM,所以事务应该是先拿到userTM然后拿到resultTM然后提交resultTM最后提交userTM,也就是说,如果我在提交user事务的时候出错,此时result相关的事务已经提交完成,所以result数据是不能回滚的。

代码和之前的一样,需要在事务提交的方法中打断点
@RequestMapping("")
@Transactional
public void testTX(){
    resultJdbcTemplate.execute("insert into result values(68,6,6)");
    userJdbcTemplate.execute("insert into user values (8,'FantJ',23,'男')");
}


注意拦截到断点时,先放行一个commit,也就是result事务的commit,然后拦截到第二个commit请求时,关闭user所在的数据库,然后放行。

下面是将第一个commit请求放行后的控制台日志:

Creating new transaction with name [springbootjtamultidb.jtamulti.controller.MainController.
Acquired Connection [HikariProxyConnection@810768078 wrapping com.mysql.cj.jdbc.ConnectionImpl@3cfb22cd] for JDBC transaction
Switching JDBC Connection [HikariProxyConnection@810768078 wrapping com.mysql.cj.jdbc.ConnectionImpl@3cfb22cd] to manual commit
Executing SQL statement [insert into result values(68,6,6)]
Executing SQL statement [insert into user values (8,'FantJ',23,'男')]
Initiating transaction commit
Committing JDBC transaction on Connection [HikariProxyConnection@810768078 wrapping com.mysql.cj.jdbc.ConnectionImpl@3cfb22cd]
 Releasing JDBC Connection [HikariProxyConnection@810768078 wrapping com.mysql.cj.jdbc.ConnectionImpl@3cfb22cd] after transaction
 Resuming suspended transaction after completion of inner transaction

注意倒数第三行日志,它出现证明我们第一个result的sql事务已经提交,此时你刷新数据库数据已经更新了,但是我们断点并还没有放行,user的事务还没有提交,我把user的数据库源关闭,再放行,可以看到,result已经有数据,user没有数据,此时result并没有进行回滚,这是链式事务的缺点。


谈谈使用环境

JTA优缺点

JTA它的缺点是二次提交,事务时间长,数据锁的时间太长,性能比较低。

优点是强一致性,对于数据一致性要求很强的业务很有利,而且可以用于微服务。

链式/同步事务优缺点

优点: 比JTA轻量,能满足大部分事务需求,也是强一致性。

缺点: 只能单机玩,不能用于微服务,事务依次提交后提交的事务若出错不能回滚。

它两的比较

  1. JTA重,Chained轻。
  2. JTA能用于微服务分布式事务,Chained只能用于单机分布式事务。

事实上我们处理分布式事务都要求做到最终一致性。就是你刚开始我不需要保持你的数据一致,你中间可以出错,但是我能保证最终数据是一致的。这种做法性能最高,下一章节会谈。

相关推荐

想减少Windows 11内存占用?请取消固定Teams

如果你想要提高Windows11系统的运行速度,那么可以禁用某些默认启用的功能和设置。如果你的Windows11是安装在已经停止支持的设备或者内存容量不高的旧设备,那么应该立即限制或禁用固...

Windows查看端口占用、查看PID对应的进程、并终止进程

Windows下:查看端口占用netstat-ano|findstr"端口号"获取到pid查看PID对应的进程tasklist|findstr"进程ID"...

计算机组成原理(36): 分时之一——进程

建立一个虚拟机VM目标:给每个程序一个自己的虚拟机“VirtualMachine”,程序并不知道其他的虚拟机。1.1进程(Process)为了捕获正在运行的程序,我们创建一个称为“进程(Proce...

window系统如何停止端口被占用的进程(高手版)

如上图1,作为开发人员是不是经常遇到这个问题?(Webserverfailedtostart.Port9527wasalreadyinuse.)当然,如果在你知道确实有某个进程正占...

电脑的文件无法删除咋回事?你需要这款神兵利器

很多朋友用电脑的时候,都遇到过文件无法删除的情况。这往往是由于文件被某个软件、进程所调用所引发的——在Windows中,某个文件如果被使用,这个文件可能就没法进行删除、重命名之类的操作了。想要进一步操...

Windows日志分析(windows 日志文件)

1.Windows日志文件简介1.1Windows日志核心分类1.系统日志系统日志包含由Windows系统组件记录的事件,记录系统进程和设备驱动程序的活动。由它审核的系统事件包括启动失败的设备驱动程...

电脑软件崩溃、闪退不用慌!DJS Tech 教你几招轻松解决

当你正全神贯注用电脑处理重要文件、沉浸在精彩的游戏世界,或是观看喜欢的视频时,软件突然崩溃、闪退,那一刻的烦躁简直难以言喻。别着急,DJSTech作为深耕计算机领域多年的专业团队,为你带来一系列超...

微软Win11推进淘汰控制面板,时间服务器配置迁移至设置应用

IT之家5月29日消息,科技媒体Winaero昨日(5月28日)发布博文,报道称微软在Windows11系统中,继续推进“淘汰控制面板”进程,配置时间服务器地址选项迁移到设置应...

微软 PowerToys更新,可帮你找出 Win11上哪些进程正在占用该文件

IT之家11月3日消息,微软针对Windows11和Windows10的PowerToys已经更新到了最新的0.64.0版本,并上线了一个名为“文件锁匠FileLock...

Windows基础操作 认识任务管理器(windows任务管理器的使用)

Windows基础操作:认识任务管理器任务管理器(TaskManager)是Windows系统中一个功能强大的实用工具,它为用户提供了实时监控系统资源、管理正在运行的程序和服务的能力。掌握任务管理器...

windows——netstat过滤(终止进程)

windows——netstat过滤(终止进程)在Windows操作系统中,使用netstat命令可以查看网络连接的状态。要过滤特定协议或端口的连接,可以使用以下命令:查看所有连接:netstat-...

只要这么做 Windows Defender与第三方就能和平共存啦

无论大家是否喜欢WindowsDefender,伴随着Windows10的不断升级,它已经成为系统的底层必备组件之一。虽然我们有各种各样的方法去关闭它,换用顺手的第三方,但只要更新打补丁,噩梦就来...

Win10如何彻底关闭wsappx进程(win10 wsappx怎么关闭)

win10如何彻底关闭wsappx进程?wsappx进程是什么?wsappx进程是Windows10系统的一部分,这个进程是WindowsStore和微软通用应用程序平台(UWP)的依赖进程。...

Windows环境黑客入侵应急与排查(黑客入侵电脑原理)

1文件分析1.1临时目录排查黑客往往可能将病毒放在临时目录(tmp/temp),或者将病毒相关文件释放到临时目录,因此需要检查临时目录是否存在异常文件。假设系统盘在C盘,则通常情况下的临时目录如下...

Windows 11 24H2 KB5044384出现大面积安装失败、任务管理器0进程等问题

Windows11KB5044384更新由于出现大量错误而无法在Windows1124H2上安装、其中包括一个奇怪的错误,即由于0x800f0922、0x800736b3和0x8...

取消回复欢迎 发表评论: