RocketMQ如何解决分布式事务
本篇内容主要讲解“RocketMQ如何解决分布式事务”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“RocketMQ如何解决分布式事务”吧!
成都创新互联公司是专业的汉川网站建设公司,汉川接单;提供做网站、网站设计,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行汉川网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
一致性如何保证:
RocketMQ解决分布式事务(可靠消息最终一致性方案)
1、A系统发送一个prepared消息到MQ,如果这个prepared消息发送失败那么就直接取消操作别执行了。
2、如果这个消息发送成功了、就接着执行本地事务(executeLocalTransaction),如果成功就告诉MQ发送确认消息,如果失败,就告诉MQ发送回滚消息。
3、如果发送了确认消息、那么B系统会接收到确认消息,然后执行本地事务。
4、上面的第2步, 由于网络原因发送确认or回滚消息失败,但是broker有轮询机制,根据唯一id查询本地事务状态,MQ会自动定时轮询所有prepared消息回调你的接口(checkLocalTransaction),问你,这个消息是不是本地事务处理失败了,所有没有发送确认的消息,是继续重试还是回滚?一版来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,而确认消息却发送失败了。
PS:此方案是不支持事务发起服务进行回滚的,但是大部分互联网应用都不会要求事务发起方进行回滚,如果一定要事务发起方进行回滚应该采用2PC、3PC、TCC等强一致性方案来实现分布式事务,比如LCN。
订单-库存-分布式事务
这里通过一个实例来讲一下RocketMQ实现分布式事务具体编码。
场景: 下单场景,订单服务生成订单,当该订单支付成功之后,修改订单状态已支付,并且要通知库存服务进行库存的扣减。
数据库设计:
CREATE TABLE `yzy_order` ( `id` int(11) NOT NULL, `order_id` varchar(100) NOT NULL DEFAULT '' COMMENT '订单id', `buy_num` int(11) DEFAULT NULL COMMENT '购买数量', `good_id` int(11) DEFAULT NULL COMMENT '商品ID', `user_id` int(11) DEFAULT NULL COMMENT '用户ID', `pay_status` int(11) DEFAULT NULL COMMENT '支付状态,0:没有支付,1:已经支付', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci CREATE TABLE `yzy_repo` ( `id` int(11) NOT NULL AUTO_INCREMENT, `good_name` varchar(100) NOT NULL DEFAULT '' COMMENT '商品名称', `num` int(11) NOT NULL DEFAULT '0' COMMENT '库存数量', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='测试,库存表表'
开始实战
订单服务service的主要方法
package com.transaction.order; import com.alibaba.dubbo.config.annotation.Reference; import com.transaction.repository.IRepositoryService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.List; @Service public class OrderService { @Autowired OrderDao orderDao; public final int PAY_DONE = 1; /** * 检查订单是否存在并且状态是支付完成 **/ public boolean checkOrderPaySuccess(String orderId){ ListallOrders = orderDao.findAll(); return allOrders.stream() .anyMatch(order -> order.getOrderId().equals(orderId) && order.getPayStatus() == PAY_DONE); } /** * 更新订单是为支付完成 **/ public void updatePayStatusByOrderId(String orderId){ orderDao.updatePayStatusByOrderId(orderId, PAY_DONE); } /** * 生成订单,状态默认是未支付 **/ public void save(String orderId, int num, int goodId,int userId) { YzyOrder yzyOrder = new YzyOrder(); yzyOrder.setOrderId(orderId); yzyOrder.setBuyNum(num); yzyOrder.setGoodId(goodId); yzyOrder.setUserId(userId); orderDao.save(yzyOrder); } }
业务流程
1.在订单表创建一个状态是未支付的订单
在终端或者浏览器 执行 curl '127.0.0.1:8081/order/save?num=2&good_id=1&user_id=1001'
/** * 生成订单接口 * @param num * @param goodId * @param userId * @return */ @GetMapping("save") public String makeOrder( @RequestParam("num") int num, @RequestParam("good_id") int goodId, @RequestParam("user_id") int userId) { orderService.save(UUID.randomUUID().toString(), num, goodId,userId); return "success"; }
2.用户支付完成,通过MQ通知库存服务扣减库存
OrderController:pay 发送订单支付成功的MQ事务消息,这里注意体会,并不是直接调用OrderService::updatePayStatusByOrderId 然后发送普通的MQ消息。而是先发送事务消息到MQ,然后MQ回调订单服务的TransactionListener::executeLocalTransaction,在这里完成订单状态的更新,保证发送事务消息和更新订单状态的一致性.
@GetMapping("pay") public String pay(@RequestParam("order_id") String orderId) throws UnsupportedEncodingException, MQClientException, JsonProcessingException { transactionProducer.sendOrderPaySucessEvent(orderId); return "success"; }
3.订单服务端的事务消息监听器
@Component public class TransactionProducer implements InitializingBean { private TransactionMQProducer producer; @Autowired private OrderService orderService; @Autowired private OrderDao orderDao; @Override public void afterPropertiesSet() throws Exception { producer = new TransactionMQProducer("order-pay-group"); producer.setNamesrvAddr("mq01.stag.kk.srv:9876;mq02.stag.kk.srv:9876"); ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("transaction-thread-name-%s").build(); ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30), threadFactory); producer.setExecutorService(executor); //设置发送消息的回调 producer.setTransactionListener(new TransactionListener() { /** * 根据消息发送的结果 判断是否执行本地事务 * * 回调该方法的时候说明 消息已经成功发送到了MQ,可以把订单状态更新为 "支付成功" */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { // 根据本地事务执行成与否判断 事务消息是否需要commit与 rollback ObjectMapper objectMapper = new ObjectMapper(); LocalTransactionState state = LocalTransactionState.UNKNOW; try { OrderRecord record = objectMapper.readValue(msg.getBody(), OrderRecord.class); //MQ已经收到了TransactionProducer send方法发送的事务消息,下面执行本地的事务 //本地记录订单信息 orderService.updatePayStatusByOrderId(record.getOrderId()); state = LocalTransactionState.COMMIT_MESSAGE; } catch (UnsupportedEncodingException e) { e.printStackTrace(); state = LocalTransactionState.ROLLBACK_MESSAGE; } catch (IOException e) { e.printStackTrace(); state = LocalTransactionState.ROLLBACK_MESSAGE; } return state; } /** * RocketMQ 回调 根据本地事务是否执行成功 告诉broker 此消息是否投递成功 * @return */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { ObjectMapper objectMapper = new ObjectMapper(); LocalTransactionState state = LocalTransactionState.UNKNOW; OrderRecord record = null; try { record = objectMapper.readValue(msg.getBody(), OrderRecord.class); } catch (IOException e) { e.printStackTrace(); } try { //根据是否有transaction_id对应转账记录 来判断事务是否执行成功 boolean isLocalSuccess = orderService.checkOrderPaySuccess(record.getOrderId()); if (isLocalSuccess) { state = LocalTransactionState.COMMIT_MESSAGE; } else { state = LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { e.printStackTrace(); } return state; } }); producer.start(); } public void sendOrderPaySucessEvent(String orderId) throws JsonProcessingException, UnsupportedEncodingException, MQClientException { ObjectMapper objectMapper = new ObjectMapper(); YzyOrder order = orderDao.findAll().stream() .filter(item->item.getOrderId().equals(orderId)) .collect(Collectors.toList()).get(0); if(order == null){ System.out.println("not found order " + orderId); } // 构造发送的事务 消息 OrderRecord record = new OrderRecord(); record.setUserId(order.getUserId()); record.setOrderId(orderId); record.setBuyNum(order.getBuyNum()); record.setPayStatus(order.getPayStatus()); record.setGoodId(order.getGoodId()); Message message = new Message("Order-Success", "", record.getOrderId(), objectMapper.writeValueAsString(record).getBytes(RemotingHelper.DEFAULT_CHARSET)); TransactionSendResult result = producer.sendMessageInTransaction(message, null); System.out.println("发送事务消息 ,orderId = " + record.getOrderId() + " " + result.toString()); } }
4.库存服务扣减库存
需要注意的问题:
1. 扣减库存要防止在并发的情况下被扣成负数
2. 先select后update的方式更新库存要加分布式锁或者数据库乐观锁,update语句需要是幂等的
UPDATE t_yue SET money=$new_money WHERE id=$good_id AND money=$old_money;
3. 注意通过msgId或者orderId来进行消费幂等处理
@Override public int reduce(Integer buyNum, Integer goodId) { //并发的情况下,为了防止库存被扣成负数,有三种解决方案 //1. select for update (必须放到事务中) //2. 这段逻辑加上分布式锁 //3. 数据库加上一个version字段,乐观锁 while (true){ OptionalrepoOption = repositoryDao.findById(goodId); if (!repoOption.isPresent()) { return 0; } YzyRepo repo = repoOption.get(); //避免数据库库存扣减小于零 if (repo.getNum() - buyNum < 0) { return -1; } repo.setNum(repo.getNum() - buyNum); int affect = repositoryDao.updateGoodNum(repo.getNum() - buyNum, repo.getNum(), goodId); if(affect > 0){ return affect; } } }
到此,相信大家对“RocketMQ如何解决分布式事务”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
文章标题:RocketMQ如何解决分布式事务
URL标题:http://pwwzsj.com/article/jdpgee.html