TCC方案(采用Hmily框架)
什么是TCC事务 ?
TCC是Try、Confirm、Cancel三个词语的缩写,TCC要求每个分支事务实现三个操作:预处理Try、确认 Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的 操作即回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所 有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel 操作若执行失败,TM会进行重试。

分支事务失败的情况:

TCC分为三个阶段:
1. Try 阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的Confirm 一起才能 真正构成一个完整的业务逻辑。
2. Confirm 阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行 Confirm。通常情况下,采用TCC则 认为 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引 入重试机制或人工处理。
3. Cancel 阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采 用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。
4. TM事务管理器
TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM的角色,TM独立出来是为了成为公 用组件,是为了考虑系统结构和软件复用。
TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条,用来记录事务上下文, 追踪和记录状态,由于Confirm 和cancel失败需进行重试,因此需要实现为幂等,幂等性是指同一个操作无论请求 多少次,其结果都相同。
TCC 解决方案
目前市面上的TCC框架众多比如下面这几种:

上一节所讲的Seata也支持TCC。我们的目标是理解TCC的原 理以及事务协调运作的过程,因此更请倾向于轻量级易于理解的框架,因此最终确定了Hmily。
Hmily是一个高性能分布式事务TCC开源框架。基于Java语言来开发(JDK1.8),支持Dubbo,Spring Cloud等 RPC框架进行分布式事务。它目前支持以下特性:
- 支持嵌套事务(Nested transaction support).
- 采用disruptor框架进行事务日志的异步读写,与RPC框架的性能毫无差别。
- 支持SpringBoot-starter 项目启动,使用简单。
- RPC框架支持 : dubbo,motan,springcloud。
- 本地事务存储支持 : redis,mongodb,zookeeper,file,mysql。
- 事务日志序列化支持 :java,hessian,kryo,protostuff。
- 采用Aspect AOP 切面思想与Spring无缝集成,天然支持集群。
- RPC事务恢复,超时异常恢复等。
Hmily利用AOP对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的 调用到另一方的Try、Confirm、Cancel方法;传递事务上下文;并记录事务日志,酌情进行补偿,重试等。
Hmily不需要事务协调服务,但需要提供一个数据库(mysql/mongodb/zookeeper/redis/file)来进行日志存 储。
Hmily实现的TCC服务与普通的服务一样,只需要暴露一个接口,也就是它的Try业务。Confirm/Cancel业务 逻辑,只是因为全局事务提交/回滚的需要才提供的,因此Confirm/Cancel业务只需要被Hmily TCC事务框架 发现即可,不需要被调用它的其他业务服务所感知。
官网介绍:https://dromara.org/website/zh-cn/docs/hmily/index.html
TCC需要注意三种异常处理分别是空回滚、幂等、悬挂:
空回滚:
在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回 滚,然后直接返回成功。
出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶 段,当故障恢复后,分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。
解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回 滚;如果没执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分 布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务 ID 和分支事务 ID,第一阶段 Try 方法里会 插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存 在,则是空回滚。
幂等:
通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求 TCC 的二阶段 Try、 Confirm 和 Cancel 接口保证幂等,这样不会重复使用或者释放资源。
如果幂等控制没有做好,很有可能导致数据 不一致等严重问题。
解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。
悬挂:
悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。
出现原因是在 RPC 调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时 RPC 调用的网络发生拥堵, 通常 RPC 调用是有超时时间的,RPC 超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC 请求 才到达参与者真正执行,而一个 Try 方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预 留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后没法继续处理。
解决思路是如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支 事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。
举例,场景为 A 转账 30 元给 B,A和B账户在不同的服务
方案1:
账户A

账户B

方案1说明:
1)账户A,这里的余额就是所谓的业务资源,按照前面提到的原则,在第一阶段需要检查并预留业务资源,因此, 我们在扣钱 TCC 资源的 Try 接口里先检查 A 账户余额是否足够,如果足够则扣除 30 元。 Confirm 接口表示正式 提交,由于业务资源已经在 Try 接口里扣除掉了,那么在第二阶段的 Confirm 接口里可以什么都不用做。Cancel 接口的执行表示整个事务回滚,账户A回滚则需要把 Try 接口里扣除掉的 30 元还给账户。
2)账号B,在第一阶段 Try 接口里实现给账户B加钱,Cancel 接口的执行表示整个事务回滚,账户B回滚则需要把 Try 接口里加的 30 元再减去。
方案1的问题分析:
1)如果账户A的try没有执行在cancel则就多加了30元。 (try中还没执行到减30元就异常回滚了,此时因为回滚需要执行cancel中补充方案加30元)(空回滚问题:没执行try资源就去执行回滚cancel方法)
2)由于try,cancel、confirm都是由单独的线程去调用,且会出现重复调用,所以都需要实现幂等。(三个方法都要进行幂等设置)
3)账号B在try中增加30元,当try执行完成后可能会其它线程给消费了。 (刚刚增加30元,其他代码还不知道会不会发现异常就被其他的线程提前把这30块拿去花了)
4)如果账户B的try没有执行在cancel则就多减了30元。(和(1)情况类似)(空回滚)
这里还需要考虑悬挂的问题。
问题解决:
1)账户A的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。 (只有try执行了(真正加钱了)才能进行回滚出来(减钱操作))
2)try,cancel、confirm方法实现幂等。
3)账号B在try方法中不允许更新账户金额,在confirm中更新账户金额。 (防止try阶段刚加的钱就被拿去花了)
4)账户B的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。(和(1)类似)
优化方案:
账户A

账户B

Hmily实现TCC事务
4.3.1.业务说明
本实例通过Hmily实现TCC分布式事务,模拟两个账户的转账交易过程。
两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。
交易过程是,张三给 李四转账指定金额。 上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。

数据库:MySQL-5.7.25
JDK:64位 jdk1.8.0_201
微服务:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
Hmily:hmily-springcloud.2.0.4-RELEASE
微服务及数据库的关系 :
dtx/dtx-tcc-demo/dtx-tcc-demo-bank1 银行1,操作张三账户, 连接数据库bank1
dtx/dtx-tcc-demo/dtx-tcc-demo-bank2 银行2,操作李四账户,连接数据库bank2
服务注册中心:dtx/discover-server
数据库准备
(1) hmily数据库(hmily用来记录异常的事务的信息)
CREATE DATABASE `hmily` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci
(2) 创建bank1库,并导入以下表结构和数据(包含张三账户)
CREATE DATABASE `bank1` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci
CREATE TABLE `account_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL COMMENT '户主姓名',
`card_number` int(11) DEFAULT NULL COMMENT '银行卡号',
`password` varchar(255) DEFAULT NULL COMMENT '帐户密码',
`balance` int(11) DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
insert into account_info(name,card_number,password,balance) values('张三用户',1,'123456',10000)
(3) 创建bank2库,并导入以下表结构和数据(包含李四账户)
CREATE DATABASE `bank2` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci
CREATE TABLE `account_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL COMMENT '户主姓名',
`card_number` int(11) DEFAULT NULL COMMENT '银行卡号',
`password` varchar(255) DEFAULT NULL COMMENT '帐户密码',
`balance` int(11) DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
insert into account_info(name,card_number,password,balance) values('李四用户',1,'123456',0)
(4) bank1、bank2 创建共同表(这几张表是用来各个方法的幂等判断的,以全局事务id进行幂等判断)
CREATE TABLE `local_try_log` (
`tx_no` varchar(64) NOT NULL COMMENT '事务id',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `local_confirm_log`(
`tx_no` varchar(64) NOT NULL COMMENT '事务id',
`create_time` datetime DEFAULT NULL
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `local_cancel_log`(
`tx_no` varchar(64) NOT NULL COMMENT '事务id',
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`tx_no`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
两个微服务集成Hmily
注意:在集成前自己要配置一个eureka的注册中心出来
1 添加依赖
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-spring-boot-starter-springcloud</artifactId>
<version>2.0.6-RELEASE</version>
</dependency>
2 修改配置文件
注意:如果是事务的发起方即bank1,此时的started属性配置为true,如果是bank2则设置为false。
bank1的yml核心配置
server:
port: 56081
servlet:
context-path: /bank1
eureka:
client:
serviceUrl:
defaultZone: http://localhost:56080/eureka/
spring:
application:
name: tcc-demo-bank1
##################### DB #####################
datasource:
ds0:
url: jdbc:mysql://localhost:3306/bank1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT user()
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
connection-properties: druid.stat.mergeSql:true;druid.stat.slowSqlMillis:5000
org:
dromara:
hmily :
serializer : kryo
recoverDelayTime : 30
retryMax : 30
scheduledDelay : 30
scheduledThreadMax : 10
repositorySupport : db
started: true #注意这个设置
hmilyDbConfig :
driverClassName : com.mysql.cj.jdbc.Driver
url : jdbc:mysql://localhost:3306/hmily?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
username : root
password : 123456
bank2的yml核心配置
server:
port: 56082
servlet:
context-path: /bank2
eureka:
client:
serviceUrl:
defaultZone: http://localhost:56080/eureka/
spring:
application:
name: tcc-demo-bank2
##################### DB #####################
datasource:
ds0:
url: jdbc:mysql://localhost:3306/bank2?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: 123456
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT user()
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
connection-properties: druid.stat.mergeSql:true;druid.stat.slowSqlMillis:5000
org:
dromara:
hmily :
serializer : kryo
recoverDelayTime : 30
retryMax : 30
scheduledDelay : 30
scheduledThreadMax : 10
repositorySupport : db
started: false
hmilyDbConfig :
driverClassName : com.mysql.cj.jdbc.Driver
url : jdbc:mysql://localhost:3306/hmily?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
username : root
password : 123456
3、编写Hmily的配置类(两个服务都要写)
package cn.itcast.dtx.tccdemo.bank1.config;
import com.alibaba.druid.pool.DruidDataSource;
import org.dromara.hmily.common.config.HmilyDbConfig;
import org.dromara.hmily.core.bootstrap.HmilyTransactionBootstrap;
import org.dromara.hmily.core.service.HmilyInitService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.core.env.Environment;
@Configuration
@EnableAspectJAutoProxy(proxyTargetClass=true)
public class DatabaseConfiguration {
private final ApplicationContext applicationContext;
@Autowired
private Environment env;
public DatabaseConfiguration(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.ds0")
public DruidDataSource ds0() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}
@Bean
public HmilyTransactionBootstrap hmilyTransactionBootstrap(HmilyInitService hmilyInitService){
HmilyTransactionBootstrap hmilyTransactionBootstrap = new HmilyTransactionBootstrap(hmilyInitService);
hmilyTransactionBootstrap.setSerializer(env.getProperty("org.dromara.hmily.serializer"));
hmilyTransactionBootstrap.setRecoverDelayTime(Integer.parseInt(env.getProperty("org.dromara.hmily.recoverDelayTime")));
hmilyTransactionBootstrap.setRetryMax(Integer.parseInt(env.getProperty("org.dromara.hmily.retryMax")));
hmilyTransactionBootstrap.setScheduledDelay(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledDelay")));
hmilyTransactionBootstrap.setScheduledThreadMax(Integer.parseInt(env.getProperty("org.dromara.hmily.scheduledThreadMax")));
hmilyTransactionBootstrap.setRepositorySupport(env.getProperty("org.dromara.hmily.repositorySupport"));
hmilyTransactionBootstrap.setStarted(Boolean.parseBoolean(env.getProperty("org.dromara.hmily.started")));
HmilyDbConfig hmilyDbConfig = new HmilyDbConfig();
hmilyDbConfig.setDriverClassName(env.getProperty("org.dromara.hmily.hmilyDbConfig.driverClassName"));
hmilyDbConfig.setUrl(env.getProperty("org.dromara.hmily.hmilyDbConfig.url"));
hmilyDbConfig.setUsername(env.getProperty("org.dromara.hmily.hmilyDbConfig.username"));
hmilyDbConfig.setPassword(env.getProperty("org.dromara.hmily.hmilyDbConfig.password"));
hmilyTransactionBootstrap.setHmilyDbConfig(hmilyDbConfig);
return hmilyTransactionBootstrap;
}
}
4、启动类增加@EnableAspectJAutoProxy并增加org.dromara.hmily的扫描项:(两个服务都要写)
@SpringBootApplication(exclude = MongoAutoConfiguration.class)
@EnableDiscoveryClient
@EnableAspectJAutoProxy
@EnableHystrix
@EnableFeignClients(basePackages = {"cn.itcast.dtx.tccdemo.bank1.spring"})
@ComponentScan({"cn.itcast.dtx.tccdemo.bank1","org.dromara.hmily"})
public class Bank1TccServer {
public static void main(String[] args) {
SpringApplication.run(Bank1TccServer.class, args);
}
}
@SpringBootApplication(exclude = MongoAutoConfiguration.class)
@EnableDiscoveryClient
@EnableHystrix
@EnableAspectJAutoProxy
@ComponentScan({"cn.itcast.dtx.tccdemo.bank2","org.dromara.hmily"})
public class Bank2TccServer {
public static void main(String[] args) {
SpringApplication.run(Bank2TccServer.class, args);
}
}
此时则集成Hmily成功。
下面使用Hmily实现TCC事务。
使用Hmily实现TCC事务
编写bank1
实体类:
@Data
public class AccountInfo implements Serializable {
private Long id;
private String accountName;
private String accountNo;
private String accountPassword;
private Double accountBalance;
}
dao:写好了对于幂等性判断的各种语句了
@Mapper
@Component
public interface AccountInfoDao {
@Update("update account_info set account_balance=account_balance - #{amount} where account_balance>=#{amount} and account_no=#{accountNo} ")
int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
@Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ")
int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
/**
* 增加某分支事务try执行记录
* @param localTradeNo 本地事务编号
* @return
*/
@Insert("insert into local_try_log values(#{txNo},now());")
int addTry(String localTradeNo);
@Insert("insert into local_confirm_log values(#{txNo},now());")
int addConfirm(String localTradeNo);
@Insert("insert into local_cancel_log values(#{txNo},now());")
int addCancel(String localTradeNo);
/**
* 查询分支事务try是否已执行
* @param localTradeNo 本地事务编号
* @return
*/
@Select("select count(1) from local_try_log where tx_no = #{txNo} ")
int isExistTry(String localTradeNo);
/**
* 查询分支事务confirm是否已执行
* @param localTradeNo 本地事务编号
* @return
*/
@Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
int isExistConfirm(String localTradeNo);
/**
* 查询分支事务cancel是否已执行
* @param localTradeNo 本地事务编号
* @return
*/
@Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
int isExistCancel(String localTradeNo);
}
service
public interface AccountInfoService {
//账户扣款
public void updateAccountBalance(String accountNo, Double amount);
}
impl:这里就是实现TCC事务的三个方法的地方。
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
@Autowired
Bank2Client bank2Client;
// 账户扣款,就是tcc的try方法
/**
* try幂等校验
* try悬挂处理
* 检查余额是够扣减金额
* 扣减金额
* @param accountNo
* @param amount
*/
@Override
@Transactional
//只要标记@Hmily就是try方法,在注解中指定confirm、cancel两个方法的名字
@Hmily(confirmMethod="commit",cancelMethod="rollback")
public void updateAccountBalance(String accountNo, Double amount) {
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 try begin 开始执行...xid:{}",transId);
//幂等判断 判断local_try_log表中是否有try日志记录,如果有则不再执行
if(accountInfoDao.isExistTry(transId)>0){
log.info("bank1 try 已经执行,无需重复执行,xid:{}",transId);
return ;
}
//try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行
if(accountInfoDao.isExistConfirm(transId)>0 || accountInfoDao.isExistCancel(transId)>0){
log.info("bank1 try悬挂处理 cancel或confirm已经执行,不允许执行try,xid:{}",transId);
return ;
}
//扣减金额
if(accountInfoDao.subtractAccountBalance(accountNo, amount)<=0){
//扣减失败
throw new RuntimeException("bank1 try 扣减金额失败,xid:{}"+transId);
}
//插入try执行记录,用于幂等判断
accountInfoDao.addTry(transId);
//远程调用李四,转账
if(!bank2Client.transfer(amount)){
throw new RuntimeException("bank1 远程调用李四微服务失败,xid:{}"+transId);
}
if(amount == 2){
throw new RuntimeException("人为制造异常,xid:{}"+transId);
}
log.info("bank1 try end 结束执行...xid:{}",transId);
}
//confirm方法
@Transactional
public void commit(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 confirm begin 开始执行...xid:{},accountNo:{},amount:{}",transId,accountNo,amount);
}
/** cancel方法
* cancel幂等校验
* cancel空回滚处理
* 增加可用余额
* @param accountNo
* @param amount
*/
@Transactional
public void rollback(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 cancel begin 开始执行...xid:{}",transId);
// cancel幂等校验
if(accountInfoDao.isExistCancel(transId)>0){
log.info("bank1 cancel 已经执行,无需重复执行,xid:{}",transId);
return ;
}
//cancel空回滚处理,如果try没有执行,cancel不允许执行
if(accountInfoDao.isExistTry(transId)<=0){
log.info("bank1 空回滚处理,try没有执行,不允许cancel执行,xid:{}",transId);
return ;
}
// 增加可用余额
accountInfoDao.addAccountBalance(accountNo,amount);
//插入一条cancel的执行记录
accountInfoDao.addCancel(transId);
log.info("bank1 cancel end 结束执行...xid:{}",transId);
}
}
控制层:
@RestController
public class Bank1Controller {
@Autowired
AccountInfoService accountInfoService;
@RequestMapping("/transfer")
public Boolean transfer(@RequestParam("amount") Double amount) {
this.accountInfoService.updateAccountBalance("1", amount);
return true;
}
}
feign:注意这里要加上@Hmily这个注解
@FeignClient(value="tcc-demo-bank2")
public interface Bank2Client {
//远程调用李四的微服务
@GetMapping("/bank2/transfer")
@Hmily
public Boolean transfer(@RequestParam("amount") Double amount);
}
这样Bank1的代码编写完成
编写Bank2的代码
实体类:
@Data
public class AccountInfo implements Serializable {
private Long id;
private String accountName;
private String accountNo;
private String accountPassword;
private Double accountBalance;
}
dao:
@Component
@Mapper
public interface AccountInfoDao {
@Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ")
int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
/**
* 增加某分支事务try执行记录
* @param localTradeNo 本地事务编号
* @return
*/
@Insert("insert into local_try_log values(#{txNo},now());")
int addTry(String localTradeNo);
@Insert("insert into local_confirm_log values(#{txNo},now());")
int addConfirm(String localTradeNo);
@Insert("insert into local_cancel_log values(#{txNo},now());")
int addCancel(String localTradeNo);
/**
* 查询分支事务try是否已执行
* @param localTradeNo 本地事务编号
* @return
*/
@Select("select count(1) from local_try_log where tx_no = #{txNo} ")
int isExistTry(String localTradeNo);
/**
* 查询分支事务confirm是否已执行
* @param localTradeNo 本地事务编号
* @return
*/
@Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
int isExistConfirm(String localTradeNo);
/**
* 查询分支事务cancel是否已执行
* @param localTradeNo 本地事务编号
* @return
*/
@Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
int isExistCancel(String localTradeNo);
}
service
public interface AccountInfoService {
//账户扣款
public void updateAccountBalance(String accountNo, Double amount);
}
impl
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
@Override
@Hmily(confirmMethod="confirmMethod", cancelMethod="cancelMethod")
public void updateAccountBalance(String accountNo, Double amount) {
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank2 try begin 开始执行...xid:{}",transId);
}
/**
* confirm方法
* confirm幂等校验
* 正式增加金额
* @param accountNo
* @param amount
*/
@Transactional
public void confirmMethod(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank2 confirm begin 开始执行...xid:{}",transId);
if(accountInfoDao.isExistConfirm(transId)>0){
log.info("bank2 confirm 已经执行,无需重复执行...xid:{}",transId);
return ;
}
//增加金额
accountInfoDao.addAccountBalance(accountNo,amount);
//增加一条confirm日志,用于幂等
accountInfoDao.addConfirm(transId);
log.info("bank2 confirm end 结束执行...xid:{}",transId);
}
/**
* @param accountNo
* @param amount
*/
public void cancelMethod(String accountNo, Double amount){
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank2 cancel begin 开始执行...xid:{}",transId);
}
}
controller
@RestController
public class Bank2Controller {
@Autowired
AccountInfoService accountInfoService;
@RequestMapping("/transfer")
public Boolean transfer(@RequestParam("amount") Double amount) {
this.accountInfoService.updateAccountBalance("2", amount);
return true;
}
}
进行测试
正常测试:

Bank1的账户少了1块钱

Bank2的账户多了1块钱

非正常测试: 此时将Bank1的try方法最后模拟一个异常, int a =10/0;此时已经进行远程调用了,即Bank1已经调用Bank2去进行加钱操作,而Bank1又因为异常要回滚(即Bank1不进行减钱)。这是我们看Bank2的加钱操作是否能成功
@Override
@Transactional
//只要标记@Hmily就是try方法,在注解中指定confirm、cancel两个方法的名字
@Hmily(confirmMethod="commit",cancelMethod="rollback")
public void updateAccountBalance(String accountNo, Double amount) {
//获取全局事务id
String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
log.info("bank1 try begin 开始执行...xid:{}",transId);
//幂等判断 判断local_try_log表中是否有try日志记录,如果有则不再执行
if(accountInfoDao.isExistTry(transId)>0){
log.info("bank1 try 已经执行,无需重复执行,xid:{}",transId);
return ;
}
//try悬挂处理,如果cancel、confirm有一个已经执行了,try不再执行
if(accountInfoDao.isExistConfirm(transId)>0 || accountInfoDao.isExistCancel(transId)>0){
log.info("bank1 try悬挂处理 cancel或confirm已经执行,不允许执行try,xid:{}",transId);
return ;
}
//扣减金额
if(accountInfoDao.subtractAccountBalance(accountNo, amount)<=0){
//扣减失败
throw new RuntimeException("bank1 try 扣减金额失败,xid:{}"+transId);
}
//插入try执行记录,用于幂等判断
accountInfoDao.addTry(transId);
//远程调用李四,转账
if(!bank2Client.transfer(amount)){
throw new RuntimeException("bank1 远程调用李四微服务失败,xid:{}"+transId);
}
if(amount == 2){
throw new RuntimeException("人为制造异常,xid:{}"+transId);
}
log.info("bank1 try end 结束执行...xid:{}",transId);
//在远程调用完后,模拟异常
int a =10/0;
}
重启运行。

Bank1:报错

Bank2控制台:会打印日志

此时查看数据库:发现张三和李四的钱没多也没少。


所以这就表明了Hmily的Tcc事务控制成功运行。
这里要发现一个问题,因为Bank2的try没写任何实现,所以try一定会成功执行,执行完后一定执行confirm方法,而confirm如果执行异常,此时不会进行回滚操作,而是进行confrim重试操作。如果还是不行可以进行手工处理或补偿处理(可以用定时任务处理等)
注意:这里也许有人会说,我们进行幂等判断的时候,对数据库的操作不是原子性的,如果并发怎么办?
这里需要说明一下,在数据库层面进行幂等判断时,可以通过乐观锁、悲观锁外,还有一种就是唯一性约束,唯一性约束即如果表中有(100,“2000-10-10”)(100为事务id,后面为时间),因为以事务id为主键,此时不能向数据库中插入同一主键的信息,即这个事务id——100只能向数据库插入一次。在这里就是根据这个插入唯一性来保证幂等的。
当Bank2向表中插入数据时,假如有两个线程同时执行到了confirm方法,此时要线程A、B都判断数据库中有没有操作这个事务的记录,我们这时假设两个线程同时都发现没有,此时A继续执行下面的代码,然后将该事务id插入了表中并插入成功,而当线程B要去插入时,原本前面判断是没有记录的,可此时由于比A慢了一点点,所以这时B进行插入则会失败,插入失败B线程就会发生回滚。所以也就保证了只有一个线程操作成功,即幂等性保证。
https://blog.csdn.net/zhengren964/article/details/104229961
当然幂等性保证还可以根据redis、token等等的机制去实现
小结
如果拿TCC事务的处理流程与2PC两阶段提交做比较,2PC通常都是在跨库的DB层面,而TCC则在应用层面的处 理,需要通过业务逻辑来实现。
这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使 得降低锁冲突、提高吞吐量成为可能。
而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现try、confirm、cancel三个操作。此 外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。
对于TCC的一些其他的概念可以看:https://www.cnblogs.com/jajian/p/10014145.html
https://blog.csdn.net/varyall/article/details/94980276?biz_id=102&utm_term=tcc%E4%BA%8B%E5%8A%A1&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-0-94980276&spm=1018.2118.3001.4449
|