RabbitMQ用多路由,多队列来破除流控
本篇内容主要讲解“RabbitMQ用多路由,多队列来破除流控”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“RabbitMQ用多路由,多队列来破除流控”吧!
目前创新互联已为千余家的企业提供了网站建设、域名、虚拟空间、成都网站托管、企业网站设计、婺城网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
流控机制是我们在使用RabbitMQ最头疼的问题,一旦并发激增时,消费者消费队列消息就像滴水一样慢。
现在我们下单后,需要给通知中心发送消息,让通知中心通知服务商收取订单,并确认提供服务。
我们先给Order接口添加一个发送消息的方法。
public interface Order {public void makeOrder(Order order); public OrderSuccessResult getResult(Order order); public void postOrder(Order order);}
实现类实现该方法
@Data@AllArgsConstructor@NoArgsConstructor@ServiceOrderVersion(value = 1)@RequiredArgsConstructorpublic class ServiceOrder extends AbstractOrder {private Long id; @NonNull private String code; @NonNull private Store store; @NonNull private ProviderService service; @NonNull private Car car; @NonNull private Date serviceDate; @NonNull private String contact; @NonNull private String contactTel; private AppUser user; @NonNull private String content; private int status; private Date createDate; @Override public void makeOrder(Order order) { ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class); IdService idService = SpringBootUtil.getBean(IdService.class); ((ServiceOrder)order).setId(idService.genId()); ((ServiceOrder)order).setCode(getCodeInfo(idService)); AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); user.setUsername(loginAppUser.getUsername()); ((ServiceOrder)order).setUser(user); ((ServiceOrder)order).setStatus(1); ((ServiceOrder)order).setCreateDate(new Date()); serviceOrderDao.save((ServiceOrder) order); }@Override public OrderSuccessResult getResult(Order order) { ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class); this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult(); return this.orderSuccessResult.getResult(order); }@Override public void postOrder(Order order) { MessageSender sender = SpringBootUtil.getBean(MessageSender.class); CompletableFuture.runAsync(() ->sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER, OwnerCarCenterMq.ROUTING_KEY_ORDER, order) ); }private String getCodeInfo(IdService idService) { String flow = String.valueOf(idService.genId()); flow = flow.substring(14,flow.length()); String pre = DateUtils.format(new Date(), DateUtils.pattern9); return pre + flow; } }
其中我们定义了这么一组队列名,交换机,和路由
public interface OwnerCarCenterMq {/** * 队列名 */ String ORDER_QUEUE = "order"; /** * 服务系统exchange名 */ String MQ_EXCHANGE_ORDER = "order.topic.exchange"; /** * 服务添加routing key */ String ROUTING_KEY_ORDER = "post.order";}
为了避免流控,我们定义了10个队列,并全部绑定到一个交换机上。
@Configurationpublic class RabbitmqConfig { @Bean public ListorderQueues() { List queues = new ArrayList<>(); for (int i = 1;i < 11;i++) { Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i); queues.add(queue); } return queues; } @Bean public TopicExchange orderExchange() { return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER); } @Bean public List bindingOrders() { List bindings = new ArrayList<>(); for (int i = 1;i < 11;i++) { Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange()) .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i); bindings.add(binding); } return bindings; } }
重新封装消息提供者,每次发送都随机选取一个路由来进行发送。
@Slf4j@Componentpublic class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {@Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange,String routingKey,Object content) {log.info("send content=" + content); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); ThreadLocalRandom random = ThreadLocalRandom.current(); this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content)); }/** * 确认后回调: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {if (!ack) {log.info("send ack fail, cause = " + cause); } else {log.info("send ack success"); } }/** * 失败后return回调: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); }/** * 对消息对象进行二进制序列化 * @param o * @return */ private byte[] serialize(Object o) { Kryo kryo = new Kryo(); ByteArrayOutputStream stream = new ByteArrayOutputStream(); Output output = new Output(stream); kryo.writeObject(output, o); output.close(); return stream.toByteArray(); } }
我们可以看到在ServiceOrder里,我们是通过异步来进行发送到。
Controller如下
@Slf4j@RestControllerpublic class OrderController {private ThreadLocalorderFactory = new ThreadLocal<>(); private ThreadLocal orderService = new ThreadLocal<>(); @Autowired private OrderBean orderBean; @Transactional @SuppressWarnings("unchecked")@PostMapping("/makeeorder")public Result makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {log.info(orderStr); Order order = setOrderFactory(orderStr,type); orderService.get().makeOrder(order); orderService.get().postOrder(order); return Result.success(orderService.get().getResult(order)); }/** * 判断是哪一种类型的订单来获取哪一种类型的具体订单工厂 * @param orderStr * @return */ private Order setOrderFactory(String orderStr,String type) { Class> classType = orderBean.getOrderMap().get(type); Object order = JSONObject.parseObject(orderStr, classType);// if (orderStr.contains("service")) {// order = JSON.parseObject(orderStr, ServiceOrder.class);// }else if (orderStr.contains("product")) {// order = JSON.parseObject(orderStr, ProductOrder.class);// } Class> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory"); this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));// if (order instanceof ServiceOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));// }else if (order instanceof ProductOrder) {// this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));// } orderService.set(orderFactory.get().getOrder()); return (Order) order; } }
最后是在我们的通知中心模块接收消息,同时对这10个队列实行监控
@Slf4j@Component@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1, OwnerCarCenterMq.ORDER_QUEUE + "_" + 2, OwnerCarCenterMq.ORDER_QUEUE + "_" + 3, OwnerCarCenterMq.ORDER_QUEUE + "_" + 4, OwnerCarCenterMq.ORDER_QUEUE + "_" + 5, OwnerCarCenterMq.ORDER_QUEUE + "_" + 6, OwnerCarCenterMq.ORDER_QUEUE + "_" + 7, OwnerCarCenterMq.ORDER_QUEUE + "_" + 8, OwnerCarCenterMq.ORDER_QUEUE + "_" + 9, OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})public class ServiceOrderConsummer {@Getter private QueueserviceOrders = new ConcurrentLinkedDeque<>(); @RabbitHandler public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {try {//告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); ServiceOrder order = unSerialize(data); this.serviceOrders.add(order); log.info(String.valueOf(order)); } catch (IOException e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); log.info("receiver fail"); } }/** * 反序列化 * @param data * @return */ private ServiceOrder unSerialize(byte[] data) { Input input = null; try { Kryo kryo = new Kryo(); input = new Input(new ByteArrayInputStream(data)); return kryo.readObject(input,ServiceOrder.class); }finally { input.close(); } } }
项目启动后,我们可以看到rabbitmq的情况如下
现我们来对其进行压测,启动Jmeter,我们使用1000线程来进行压测测试。各配置如下
保存文件上传服务器,因为本人是华为云的服务器,故在服务器上进行压测,不进行远程压测
在服务器的jmeter的bin目录下输入
./jmeter -n -t model/rabbit.jmx -l log.jtl
这里-n为不启动图形界面,-t使用我们上传的配置文件,-l记录日志
压测结果如下
我们在压测过程中来看一下rabbitmq的UI界面
消费基本上是实时的,没有出现流控积压现象。
到此,相信大家对“RabbitMQ用多路由,多队列来破除流控”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
分享标题:RabbitMQ用多路由,多队列来破除流控
网站链接:http://pwwzsj.com/article/jcgejg.html