java分布式事务JTA

论坛 期权论坛 脚本     
匿名网站用户   2020-12-21 09:34   131   0

什么是事务处理

事务是计算机应用中不可或缺的组件模型,它保证了用户操作的原子性 ( Atomicity )、一致性 ( Consistency )、隔离性 ( Isolation ) 和持久性 ( Durabilily )。关于事务最经典的示例莫过于信用卡转账:将用户 A 账户中的 500 元人民币转移到用户 B 的账户中,其操作流程如下
1. 将 A 账户中的金额减少 500
2. 将 B 账户中的金额增加 500
这两个操作必须保正 ACID 的事务属性:即要么全部成功,要么全部失败;假若没有事务保障,用户的账号金额将可能发生问题:
假如第一步操作成功而第二步失败,那么用户 A 账户中的金额将就减少 500 元而用户 B 的账号却没有任何增加(不翼而飞);同样如果第一步出错 而第二步成功,那么用户 A 的账户金额不变而用户 B 的账号将增加 500 元(凭空而生)。上述任何一种错误都会产生严重的数据不一致问题,事务的缺失对于一个稳定的生产系统是不可接受的。

Java事务API(JTA:Java Transaction API)和它的同胞Java事务服务(JTS:Java Transaction Service),为J2EE平台提供了分布式事务服务(distributed transaction)。
一个分布式事务(distributed transaction)包括一个事务管理器(transaction manager)和一个或多个资源管理器(resource manager)。
一个资源管理器(resource manager)是任意类型的持久化数据存储。
事务管理器(transaction manager)承担着所有事务参与单元者的相互通讯的责任。

JTA事务比JDBC事务更强大。一个JTA事务可以有多个参与者,而一个JDBC事务则被限定在一个单一的数据库连接。

分布式事务(Distributed Transaction)包括事务管理器(Transaction Manager)和一个或多个支持 XA 协议的资源管理器 ( Resource Manager )。我们可以将资源管理器看做任意类型的持久化数据存储;事务管理器承担着所有事务参与单元的协调与控制。

XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2和Sybase等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。XA协议包括两套函数,以xa_开头的及以ax_开头的。

java的分布式配置要用到Atomikos
Atomikos 是一个为Java平台提供增值服务的并且开源类事务管理器。

Atomikos分两个:一个是开源的TransactionEssentials,一个是商业的ExtremeTransactions。
TransactionEssentials的主要特征:JTA/XA 事务管理 —— 提供事务管理和连接池不需要应用服务器 —— TransactionEssentials可以在任何Java EE应用服务器中运行,也就是不依赖于任何应用服务器开源 —— TransactionEssentials是遵守Apache版本2许可的开源软件专注于JDBC/JMS —— 支持所有XA资源,但是资源池和消息监听是专供JDBC和JMS的与Spring 和 Hibernate 集成 —— 提供了描述如何与Spring和Hibernate集成的文档
一、环境

spring 2

ibatis2

AtomikosTransactionsEssentials-3.7.0 下载地址:http://www.atomikos.com/Main/InstallingTransactionsEssentials

MySQL-5.1 :数据库引擎为InnoDB,只有这样才能支持事务

JDK1.6

Oracle10

二 jar包

Atomikos jar必须包 transactions-jdbc.jar

transactions-jta.jar

transactions.jar

atomikos-util.jar

transactions-api.jar

ibatis,spring

只要符合二者集成jar组合即可,无额外jar

一下是具体代码实践:不需要修改代码,只要修改数据源的配置文件即可
pom.xml

<!--atomikos -->
        <dependency>
            <groupId>com.atomikos</groupId>
            <artifactId>transactions-jdbc</artifactId>
        </dependency>

        <dependency>
            <groupId>javax.transaction</groupId>
            <artifactId>jta</artifactId>
        </dependency>

dataSource.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="  
        http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd   
        http://www.springframework.org/schema/tx   
        http://www.springframework.org/schema/tx/spring-tx-4.1.xsd  
        http://www.springframework.org/schema/aop   
        http://www.springframework.org/schema/aop/spring-aop-4.1.xsd  
        ">
    <!--oracle数据源 -->
    <bean id="dataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean"
        init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="ds1" />
        <property name="xaDataSourceClassName" value="oracle.jdbc.xa.client.OracleXADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="URL">${mall.settlement.oracle.jdbc.url}</prop>
                <prop key="user">${mall.settlement.oracle.jdbc.usrename}</prop>
                <prop key="password">${mall.settlement.oracle.jdbc.password}</prop>
            </props>
        </property>
        <property name="minPoolSize" value="0" /> <!-- 连接池中保留的最小连接数 -->
        <property name="maxPoolSize" value="100" /><!-- 连接池中保留的最大连接数 -->
        <property name="borrowConnectionTimeout" value="1" />  <!--获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 -->
        <property name="maintenanceInterval" value="60" /><!--连接回收时间 -->
        <property name="maxIdleTime" value="60" /> <!-- 最大空闲时间,60秒内未使用则连接被丢弃。若为0则永不丢弃。Default: 0 -->
        <property name="loginTimeout" value="60" />     <!--java数据库连接池,最大可等待获取datasouce的时间 -->
        <!-- set a SQL for testing connection -->
        <property name="testQuery" value="${mall.settlement.oracle.query}" />
    </bean>
    <!-- myBatis文件 -->
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="dataSource" />
        <!-- 显式指定Mapper文件位置 -->
        <property name="mapperLocations" value="classpath:/oracleMybatisMapper/**/*Mapper.xml" />
        <property name="configLocation" value="classpath:mybatis-config.xml"></property>
    </bean>
    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage"
            value="com.boss.settlement.service.mapper.oracle" />
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
    </bean>


    <bean id="dataSourceDB2" class="com.atomikos.jdbc.AtomikosDataSourceBean"
        init-method="init" destroy-method="close">
        <property name="uniqueResourceName" value="ds2" />
        <property name="xaDataSourceClassName" value="com.ibm.db2.jcc.DB2XADataSource" />
        <property name="xaProperties">
            <props>
                <prop key="serverName">${mall.settlement.db2.serverName}</prop>
                <prop key="portNumber">${mall.settlement.db2.portNumber}</prop>
                <prop key="databaseName">${mall.settlement.db2.databaseName}</prop>
                <prop key="user">${mall.settlement.db2.jdbc.usrename}</prop>
                <prop key="password">${mall.settlement.db2.jdbc.password}</prop>
                <prop key="driverType">4</prop>
            </props>
        </property>
        <property name="minPoolSize" value="0" /> <!-- 连接池中保留的最小连接数 -->
        <property name="maxPoolSize" value="100" /><!-- 连接池中保留的最大连接数 -->
        <property name="borrowConnectionTimeout" value="1" />  <!--获取连接失败重新获等待最大时间,在这个时间内如果有可用连接,将返回 -->
        <property name="maintenanceInterval" value="60" /><!--连接回收时间 -->
        <property name="maxIdleTime" value="60" /> <!-- 最大空闲时间,60秒内未使用则连接被丢弃。若为0则永不丢弃。Default: 0 -->
        <property name="loginTimeout" value="60" />     <!--java数据库连接池,最大可等待获取datasouce的时间 -->
        <!-- set a SQL for testing connection -->
        <property name="testQuery" value="${mall.settlement.db2.query}" />
    </bean>

    <!-- myBatis文件 -->
    <bean id="sqlSessionFactoryDB2" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="dataSourceDB2" />
        <!-- 显式指定Mapper文件位置 -->
        <property name="mapperLocations" value="classpath:/db2MybaitsMapper/**/*Mapper.xml" />
        <property name="configLocation" value="classpath:mybatis-config.xml"></property>
    </bean>

    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage"
            value="com.boss.settlement.service.mapper.db2" />
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryDB2" />
    </bean>

    <bean id="transactionManager"
        class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager">
            <bean class="com.atomikos.icatch.jta.UserTransactionManager"
                init-method="init" destroy-method="close">
                <property name="forceShutdown" value="true" />
            </bean>
        </property>
        <property name="userTransaction">
            <bean class="com.atomikos.icatch.jta.UserTransactionImp" />
        </property>
        <property name="allowCustomIsolationLevels" value="true" />
    </bean>

    <tx:annotation-driven />
</beans>  

java代码:

        @Transactional
    @Override
    @SuppressWarnings("unchecked")
    public Map<String, Object> fundTransferJTA(String loginName, FundTransfer transfer, String channelType) {
        Map<String, Object> resultMap = new HashMap<String, Object>();
        try {
            // 转账审核通过
            transfer.setCheckDate(new Date());
            transfer.setCheckStatus(CheckStatusEnum.CHECK_STATUS_Y.getKey());
            transfer.setCheckUser(loginName);
            fundTransferMapper.updateFundTransfer(transfer);
            // 划转资金
            Map<String, Object> fundMidMap = new HashMap<String, Object>();
            StringBuffer str = new StringBuffer();
            str.append("TRANSFER_ID=" + transfer.getTransferId());
            fundMidMap.put("in_para", str.toString());
            fundMidProcedureMapper.pdFundTransfer(fundMidMap);
            PdFundMidCrtRollDto pdFundMidCrtRollDto = ((List<PdFundMidCrtRollDto>) fundMidMap.get("out_pay_result")).get(0);
            String batchNoStr = pdFundMidCrtRollDto.getBatchNo();
            if (StringUtils.isBlank(batchNoStr)) {
                resultMap.put("msg", "划入货款/划出货款返回批次为空");
                resultMap.put("isSuccess", false);
                return resultMap;
            }
            Map<String, Object> payMap = payChgCalcJTA(batchNoStr, channelType);
            if (!(boolean) payMap.get("isSuccess")) {
                // 手动回滚
                TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
                resultMap.put("msg", payMap.get("msg"));
                resultMap.put("isSuccess", false);
                return resultMap;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            // 手动回滚
            TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
            resultMap.put("msg", ErrorMessageUtil.getErrorMessage(e));
            resultMap.put("isSuccess", false);
            return resultMap;
        }
        resultMap.put("isSuccess", true);
        return resultMap;
    }
分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1136255
帖子:227251
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP