RockerMq之事务消息

论坛 期权论坛 脚本     
匿名技术用户   2021-1-1 23:18   117   0

本篇博客的RocketMq版本为4.3.0,首先附上RocketMq事务消息的流程图,以下的实践也都是基于此流程图

生产端项目

RocketMqConfig配置生产者

package com.yj.producer.config;

import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.yj.producer.listener.MyTransactionListener;

@Configuration
public class RocketMqConfig {

 @Value("${rocketmq.producer.groupName}")
 private String producerGroupName;

 @Value("${rocketmq.namesrv.addr}")
 private String nameServerAddress;

 @Bean(initMethod = "start", destroyMethod = "shutdown")
 public TransactionMQProducer producer(MyTransactionListener myTransactionListener) {
  TransactionMQProducer producer = new TransactionMQProducer(producerGroupName);
  // VipChannel阿里内部使用版本才用,开源版本没有,默认为true,占用10909端口,此时虚拟机需要开放10909端口,否则会报
  // :connect to <:10909> failed异常,可以直接设置为false
  // producer.setVipChannelEnabled(false);
  producer.setNamesrvAddr(nameServerAddress);
  producer.setTransactionListener(myTransactionListener);
  return producer;
 }
}

MyTransactionListener事务监听器,主要有两个方法,一个执行本地事务的方法,一个提供给MqServer进行消息状态回查的方法,对于上面的流程图中的步骤③和⑥

package com.yj.producer.listener;

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.yj.producer.model.MsgStatus;
import com.yj.producer.service.AccountService;

@Component
public class MyTransactionListener implements TransactionListener {

 private static final Logger log = LoggerFactory.getLogger(MyTransactionListener.class);

 @Autowired
 private AccountService accountService;

 private ConcurrentHashMap<String, Integer> transStatusMap = new ConcurrentHashMap<>();

 @Override
 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  String transactionId = msg.getTransactionId();
  log.info("准备开始更新账户,当前状态为:"+MsgStatus.UNKNOW);
  transStatusMap.put(transactionId, MsgStatus.UNKNOW.getStatus());

  HashMap<String, Object> map = (HashMap<String, Object>) arg;
  long userId = (long) map.get("user_id");
  long productId = (long) map.get("product_id");
  String errorFlag = (String) map.get("errorFlag");
  String timeFlag = (String) map.get("timeFlag");
  try {
   accountService.updateAccount(userId, productId, errorFlag,timeFlag);
   log.info("更新账户成功");
  } catch (Exception e) {
   log.info("更新账户发生异常,事务回滚,状态为:"+MsgStatus.ROLLBACK);
   transStatusMap.put(transactionId, MsgStatus.ROLLBACK.getStatus());
   return LocalTransactionState.ROLLBACK_MESSAGE;
  }
  log.info("更新账户成功,状态为:"+MsgStatus.COMMIT);
  transStatusMap.put(transactionId, MsgStatus.COMMIT.getStatus());
  return LocalTransactionState.COMMIT_MESSAGE;
 }

 @Override
 public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  String transactionId = msg.getTransactionId();
  Integer status = transStatusMap.get(transactionId);
  if (status == MsgStatus.COMMIT.getStatus()) {
   log.info("消息回查,状态为:"+MsgStatus.COMMIT);
   return LocalTransactionState.COMMIT_MESSAGE;
  } else if (status == MsgStatus.ROLLBACK.getStatus()) {
   log.info("消息回查,状态为:"+MsgStatus.ROLLBACK);
   return LocalTransactionState.ROLLBACK_MESSAGE;
  }
  log.info("消息回查,状态为:"+MsgStatus.UNKNOW);
  return LocalTransactionState.UNKNOW;
 }
}

自定义MQProducer封装生产者发送事务消息的方法

package com.yj.producer.producer;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.yj.producer.model.BusinessException;

@Component
public class MQProducer {

 @Autowired
 private TransactionMQProducer producer;

 public SendResult sendTranscationMsg(Message message,Object arg) throws BusinessException {
  SendResult result = null;
  try {
   result = producer.sendMessageInTransaction(message, arg);
  } catch (Exception e) {
   e.printStackTrace();
   throw new BusinessException("发送事务消息发生异常");
  }
  return result;
 }
}

AccountService

package com.yj.producer.service;

import java.util.HashMap;
import org.apache.rocketmq.common.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.alibaba.fastjson.JSON;
import com.yj.producer.dao.AccountDao;
import com.yj.producer.model.Storage;
import com.yj.producer.producer.MQProducer;

@Service
public class AccountService {

 private static final Logger log = LoggerFactory.getLogger(AccountService.class);

 @Autowired
 private AccountDao accountDao;
 
 @Autowired
 private MQProducer producer;
 
    @Value("${rocketmq.topic}")
    private String topic;

 /**
  * 本地事务
  */
 @Transactional
 public String updateAccount(long userId, long productId, String errorFlag,String timeFlag) {
  business(userId, productId, errorFlag,timeFlag);
  return "下单成功";
 }
 
 public String mqTransaction(long userId, long productId, String errorFlag,String timeFlag) {
  Message message = new Message();
  message.setTopic(topic);
  
  Storage storage=new Storage();
  storage.setProductId(productId);
  String storageStr = JSON.toJSONString(storage);
  message.setBody(storageStr.getBytes());
  message.setTags(productId + "_tag");
  
  HashMap<String,Object> arg=new HashMap<>();
  arg.put("user_id", userId);
  arg.put("product_id", productId);
  arg.put("errorFlag", errorFlag);
  arg.put("timeFlag", timeFlag);
  producer.sendTranscationMsg(message,arg);
  return "下单成功";
 }

 private void business(long userId, long productId, String errorFlag,String timeFlag) {
  log.info("==账户服务扣减账户余额开始==");
  accountDao.updateAccount(userId);
  if ("1".equals(errorFlag)) {
   int i = 1 / 0;
  }
  if ("1".equals(timeFlag)) {
   try {
    Thread.sleep(120000L);
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
  }
  log.info("==账户服务扣减账户余额结束==");
 }
}

AccountController

package com.yj.producer.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.yj.producer.service.AccountService;

@RestController
public class AccountController {

 @Autowired
 private AccountService accountService;

 @PostMapping("/mqTransaction")
 public String mqTransaction(@RequestParam("user_id") long userId, @RequestParam("product_id") long productId,
   @RequestParam("errorFlag") String errorFlag, @RequestParam("timeFlag") String timeFlag) {
  return accountService.mqTransaction(userId, productId, errorFlag, timeFlag);
 }
}

MsgStatus

package com.yj.producer.model;

public enum MsgStatus {
 COMMIT(1,"提交"),
 ROLLBACK(2,"回滚"),
 UNKNOW(3,"未知");
 private int status;
 private String msg;
 private MsgStatus(int status, String msg) {
  this.status = status;
  this.msg = msg;
 }
 public int getStatus() {
  return status;
 }
 public String getMsg() {
  return msg;
 }
}

消费端项目

消费端的配置没有什么特别的地方,可以参考mq顺序消息的消费者的配置,消费端监听到消息后,执行本地事务

package com.yj.consumer.consumer;

import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import com.yj.consumer.model.Storage;
import com.yj.consumer.service.StorageService;

@Service
public class MessageProcessor {
 
 @Value("${server.port}")  
    private String port; 
 
 @Autowired
 private StorageService storageService;

 private static final Logger LOGGER = LoggerFactory.getLogger(MessageProcessor.class);

 public Boolean handleMessage(MessageExt messageExt) {
  try {
   String body = null;
   // 消费者消费
   if (null == messageExt || null == messageExt.getBody()) {
    LOGGER.info("消息体为空");
    return false;
   }
   body = new String(messageExt.getBody());
   Storage storage = JSON.parseObject(body, Storage.class);
   LOGGER.info("port:"+port+",消费Tag:" + messageExt.getTags() + ",storage:" + storage);
   storageService.updateStorage(storage.getProductId());
  } catch (Exception e) {
   e.printStackTrace();
   LOGGER.error(e.getMessage(), e);
   return false;
  }
  return true;
 }
}

StorageService

package com.yj.consumer.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.yj.consumer.dao.StorageDao;

@Service
public class StorageService {

 private static final Logger log = LoggerFactory.getLogger(StorageService.class);
  
 @Autowired
    private StorageDao storageDao;
 
  /**
     * 扣减库存
     */
 @Transactional
    public void updateStorage(long productId) {
  log.info("==库存服务扣减库存开始==");
        try {
   storageDao.updateStorage(productId);
   //int i=1/0;
  } catch (Exception e) {
   log.info("消费方消费时发生异常,此时需人工介入处理");
   e.printStackTrace();
  }
        log.info("==库存服务扣库存结束==");
    }
}

启动MQ环境,开始验证

①生产端,消费端均正常的情况

http://127.0.0.1:8001/mqTransaction?user_id=1&product_id=1&errorFlag=0&timeFlag=0

生产方日志

2020-01-28 16:12:34.859  INFO 12580 --- [nio-8001-exec-1] c.y.p.listener.MyTransactionListener     : 准备开始更新账户,当前状态为:UNKNOW
2020-01-28 16:12:35.142  INFO 12580 --- [nio-8001-exec-1] com.yj.producer.service.AccountService   : ==账户服务扣减账户余额开始==
2020-01-28 16:12:35.163 DEBUG 12580 --- [nio-8001-exec-1] c.y.p.dao.AccountDao.updateAccount       : ==>  Preparing: update account set money = money-10 where user_id = ? 
2020-01-28 16:12:35.266 DEBUG 12580 --- [nio-8001-exec-1] c.y.p.dao.AccountDao.updateAccount       : ==> Parameters: 1(Long)
2020-01-28 16:12:35.270 DEBUG 12580 --- [nio-8001-exec-1] c.y.p.dao.AccountDao.updateAccount       : <==    Updates: 1
2020-01-28 16:12:35.270  INFO 12580 --- [nio-8001-exec-1] com.yj.producer.service.AccountService   : ==账户服务扣减账户余额结束==
2020-01-28 16:12:35.276  INFO 12580 --- [nio-8001-exec-1] c.y.p.listener.MyTransactionListener     : 更新账户成功
2020-01-28 16:12:35.276  INFO 12580 --- [nio-8001-exec-1] c.y.p.listener.MyTransactionListener     : 更新账户成功,状态为:COMMIT

消费方日志

2020-01-28 16:12:35.560  INFO 15628 --- [MessageThread_1] com.yj.consumer.service.StorageService   : ==库存服务扣减库存开始==
2020-01-28 16:12:35.575 DEBUG 15628 --- [MessageThread_1] c.y.c.dao.StorageDao.updateStorage       : ==>  Preparing: update storage set num = num-1 where product_id = ? 
2020-01-28 16:12:35.675 DEBUG 15628 --- [MessageThread_1] c.y.c.dao.StorageDao.updateStorage       : ==> Parameters: 1(Long)
2020-01-28 16:12:35.682 DEBUG 15628 --- [MessageThread_1] c.y.c.dao.StorageDao.updateStorage       : <==    Updates: 1
2020-01-28 16:12:35.682  INFO 15628 --- [MessageThread_1] com.yj.consumer.service.StorageService   : ==库存服务扣库存结束==

业务正常执行

②生产端执行异常的情况

http://127.0.0.1:8001/mqTransaction?user_id=1&product_id=1&errorFlag=1&timeFlag=0

生产端日志

2020-01-28 16:14:58.768  INFO 12580 --- [nio-8001-exec-3] c.y.p.listener.MyTransactionListener     : 准备开始更新账户,当前状态为:UNKNOW
2020-01-28 16:14:58.782  INFO 12580 --- [nio-8001-exec-3] com.yj.producer.service.AccountService   : ==账户服务扣减账户余额开始==
2020-01-28 16:14:58.782 DEBUG 12580 --- [nio-8001-exec-3] c.y.p.dao.AccountDao.updateAccount       : ==>  Preparing: update account set money = money-10 where user_id = ? 
2020-01-28 16:14:58.783 DEBUG 12580 --- [nio-8001-exec-3] c.y.p.dao.AccountDao.updateAccount       : ==> Parameters: 1(Long)
2020-01-28 16:14:58.786 DEBUG 12580 --- [nio-8001-exec-3] c.y.p.dao.AccountDao.updateAccount       : <==    Updates: 1
2020-01-28 16:14:58.813  INFO 12580 --- [nio-8001-exec-3] c.y.p.listener.MyTransactionListener     : 更新账户发生异常,事务回滚,状态为:ROLLBACK

消费端没有接收到消息,无日志输出

③生产端执行超时的情况

生产端日志,发现进行了消息的回查

2020-01-28 16:18:57.914  INFO 3852 --- [nio-8001-exec-1] c.y.p.listener.MyTransactionListener     : 准备开始更新账户,当前状态为:UNKNOW
2020-01-28 16:18:58.222  INFO 3852 --- [nio-8001-exec-1] com.yj.producer.service.AccountService   : ==账户服务扣减账户余额开始==
2020-01-28 16:18:58.238 DEBUG 3852 --- [nio-8001-exec-1] c.y.p.dao.AccountDao.updateAccount       : ==>  Preparing: update account set money = money-10 where user_id = ? 
2020-01-28 16:18:58.367 DEBUG 3852 --- [nio-8001-exec-1] c.y.p.dao.AccountDao.updateAccount       : ==> Parameters: 1(Long)
2020-01-28 16:18:58.371 DEBUG 3852 --- [nio-8001-exec-1] c.y.p.dao.AccountDao.updateAccount       : <==    Updates: 1
2020-01-28 16:20:16.033  INFO 3852 --- [pool-1-thread-1] c.y.p.listener.MyTransactionListener     : 消息回查,状态为:UNKNOW
2020-01-28 16:20:57.938  INFO 3852 --- [lientSelector_1] RocketmqRemoting                         : closeChannel: close the connection to remote address[192.168.190.131:10909] result: true
2020-01-28 16:20:58.373  INFO 3852 --- [nio-8001-exec-1] com.yj.producer.service.AccountService   : ==账户服务扣减账户余额结束==
2020-01-28 16:20:58.376  INFO 3852 --- [nio-8001-exec-1] c.y.p.listener.MyTransactionListener     : 更新账户成功
2020-01-28 16:20:58.376  INFO 3852 --- [nio-8001-exec-1] c.y.p.listener.MyTransactionListener     : 更新账户成功,状态为:COMMIT

消费端日志,超时结束后,消费端,消费了消息

2020-01-28 16:20:58.382  INFO 15628 --- [MessageThread_2] com.yj.consumer.service.StorageService   : ==库存服务扣减库存开始==
2020-01-28 16:20:58.383 DEBUG 15628 --- [MessageThread_2] c.y.c.dao.StorageDao.updateStorage       : ==>  Preparing: update storage set num = num-1 where product_id = ? 
2020-01-28 16:20:58.383 DEBUG 15628 --- [MessageThread_2] c.y.c.dao.StorageDao.updateStorage       : ==> Parameters: 1(Long)
2020-01-28 16:20:58.388 DEBUG 15628 --- [MessageThread_2] c.y.c.dao.StorageDao.updateStorage       : <==    Updates: 1
2020-01-28 16:20:58.388  INFO 15628 --- [MessageThread_2] com.yj.consumer.service.StorageService   : ==库存服务扣库存结束==
2020-01-28 16:21:16.052  INFO 15628 --- [MessageThread_3] c.yj.consumer.consumer.MessageProcessor  : port:8004,消费Tag:1_tag,storage:Storage(id=0, productId=1, num=null)
2020-01-28 16:21:16.053  INFO 15628 --- [MessageThread_3] com.yj.consumer.service.StorageService   : ==库存服务扣减库存开始==
2020-01-28 16:21:16.053 DEBUG 15628 --- [MessageThread_3] c.y.c.dao.StorageDao.updateStorage       : ==>  Preparing: update storage set num = num-1 where product_id = ? 
2020-01-28 16:21:16.053 DEBUG 15628 --- [MessageThread_3] c.y.c.dao.StorageDao.updateStorage       : ==> Parameters: 1(Long)
2020-01-28 16:21:16.057 DEBUG 15628 --- [MessageThread_3] c.y.c.dao.StorageDao.updateStorage       : <==    Updates: 1
2020-01-28 16:21:16.057  INFO 15628 --- [MessageThread_3] com.yj.consumer.service.StorageService   : ==库存服务扣库存结束==

④消费端异常的情况

消费端异常的情况下,需人工介入处理。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题。比如自动回滚失败,又怎么处理?对应这种极低概率的case,采取人工处理,会比实现一个高复杂的自动化回滚系统,更加可靠,也更加简单。

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP