RabbitMQ可靠投递

  • 背景
  • confirmCallback确认模式
  • returnCallback未投递到 queue退回模式
  • shovel-plugin跨机房可靠投递

背景

在使用 RabbitMQ的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ为我们提供了两个选项用来控制消息的投递可靠性模式。

创新互联建站服务项目包括武邑网站建设、武邑网站制作、武邑网页制作以及武邑网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,武邑网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到武邑省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!

rabbitmq整个消息投递的路径为:
producer->rabbitmq broker cluster->exchange->queue->consumer

messageproducerrabbitmq broker cluster则会返回一个 confirmCallback
messageexchange->queue投递失败则会返回一个 returnCallback。我们将利用这两个 callback控制消息的最终一致性和部分纠错能力。

confirmCallback 确认模式

在创建 connectionFactory的时候设置 PublisherConfirms(true)选项,开启 confirmcallback

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//开启confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息发送失败!" + cause + data.toString());
        } else {
            log.info("消息发送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

我们来看下 ConfirmCallback接口。

public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(CorrelationData correlationData, boolean ack, String cause);

    }

重点是 CorrelationData对象,每个发送的消息都需要配备一个 CorrelationData相关数据对象,CorrelationData对象内部只有一个 id属性,用来表示当前消息唯一性。

发送的时候创建一个 CorrelationData对象。

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

这里将 user ID设置为当前消息 CorrelationData id。当然这里是纯粹 demo,真实场景是需要做业务无关消息 ID生成,同时要记录下这个 id用来纠错和对账。

消息只要被 rabbitmq broker接收到就会执行 confirmCallback,如果是 cluster模式,需要所有 broker接收到才会调用 confirmCallback

broker接收到只能表示 message已经到达服务器,并不能保证消息一定会被投递到目标 queue里。所以需要用到接下来的 returnCallback

returnCallback 未投递到queue退回模式

confrim模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue里,此时就需要用到 return退回模式。

同样创建 ConnectionFactory到时候需要设置 PublisherReturns(true)选项。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//开启return模式
rabbitTemplate.setMandatory(true);//开启强制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

这样如果未能投递到目标 queue里将调用 returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。

shovel-plugin 跨机房可靠投递

RabbitMQ在跨机房集成提供了一个不错的插件 shovel。使用 shovel-plugin插件非常方便,shovel可以接受机房之间的网络断开、机器下线等不稳定因素。

这里有两个 broker

10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2

我们希望将发送给 rabbit_node1 plen.queue 的消息传输到rabbit_node2 plen.queue中。我们先开启 __rabbit_node1shovel-plugin__。

先看下当前 RabbitMQ版本是否安装了 shovel-plugin,如果有的话直接开启。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

然后就可以在 Admin面板里看到这个设置选项,怎么设置这里就不介绍了。主要就是配置下 amqp协议地址,amqp://user:password@server-name/my-vhost

如果配置没有问题的话,应该是这样的一个状态,说明已经顺利连接到 __rabbit_node2 broker__ 。

RabbitMQ 可靠投递
RabbitMQ 可靠投递

我们来看下 rabbit_node1rabbit_node2Connections面板。
__rabbit_node1(10.211.55.3):__
RabbitMQ 可靠投递

__rabbit_node2(10.211.55.4):__
RabbitMQ 可靠投递

RabbitMQ shovel-plugin插件在 rabbit_node1 broker 创建了两个tcp 连接,端口 39544 连接是用来消费plen.queue 里的消息,端口 55706 连接是用来推送消息给rabbit_node2

我们来看下 __rabbit_node1 tcp__ 连接状态:

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

__rabbit_node2 tcp__ 连接状态:

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

为了验证 shovel-plugin稳定性,我们将 __rabbit_node2__ 下线。
RabbitMQ 可靠投递

然后再发送消息,发现消息会现在 rabbit_node1plen.queue 里待着,一旦shovel-plugin 连接恢复将消费rabbit_node1 plen.queue消息,然后投递给 __rabbit_node2 plen.queue__ 。

作者:王清培 (沪江集团资深JAVA架构师)


分享文章:RabbitMQ可靠投递
文章网址:http://pwwzsj.com/article/gseoee.html