RocketMq的理解和记录-创新互联
- ActiveMQ:
- 消费消息的异步操作的代码逻辑图示和讲解
- 临时节点创建两端会话的情况
- 遇到的问题
- RocketMQ
- rocketMq 事务消息:
- rocketMq中使用消息事物的流程
- RocketMQ角色
- broker
- broker集群
- producer
- consumer
- namespace
- 对比JMS中的Topic和Queue
- 发送方式
- 批量发送消息
- 消费消息模式
- 集群消息
- 广播消息
- RocketMq源码一级高级Api
- Offset
- minOffset
- maxOffset
- consumerOffset
- diffTotal
- 消费者
- DefaultMQPushConsumer与DefaultMQPullConsumer
- 集群消息的消息负载均衡
- ProcessQueue
- 长轮询
- 短轮询
- 长轮询
- 长连接
- 消息存储
- 磁盘存储速度问题
- 省去DB层提高性能
- M.2 NVME协议磁盘存储
- 数据零拷贝技术
- 内存映射MapperdByteBuffer API
- 存储结构
- CommitLog
- CommitLog内部结构
- ConsumerQueue
- indexFile
- 存储路径配置
- config
- consumerFilter.json
- consumerOffset.json
- delayOffset.json
- subscriptionGroup.json
- topics.json
- 刷盘机制
- 写入是消息会不会分割到两个MapedFile中?
- 同步刷盘
- 异步刷盘
- 配置选项
- 消息存储流程
- broker启动流程
- 消息持久化流程
- NameServer特点
- NameServer启动流程
- 消息路由之HeartBeat信息收集
- 集群模式消息存储架构模型
- 消息路由发现
主题 topic:只有已经启动的连接mq的消费者才能接受到消息,如果没有客户端连接消息就不会广播出去;数据默认不落地,是无状态的
队列 queue:只支持一次消费,如果没有消费端去消费就存储在mq中,没有被消费的消息可以实持久化;数据默认在mq服务器以文件形式保存,$AMQ_HOME\data\kahadb下面,也可以配制成DB存储
可以放的数据
读的亿级流量,写的时候可能是千级需要一个高性能的客户端,用nginx写lua脚本,把信息从nginx到mq里打,标记接收数据的标识,说明这个数据未被消费,后面启动一些数据慢慢消费,两阶段提交TCC借助了Mq的确认机制来完成分布式事物
集群可以理解成复制;分布式是每个服务里不一样
一个项目一个账号
broker指的是当前服务器的配置
activeMQ.DLQ默认死信队列
topic模式下,消息发送没有人接收,那么消息就会存在内存里;有一个人消费那么消息就不会存在内存里;未消费的数据超过过期时间就会进入死信队列,producer监听死信队列就可以获取未消费的数据重新发送;消息重投
还有一种消息重投机制:基于mq重投,类似于定时任务重复投递,发了ack也要重投,这条消息设置某个时间点重投十次
(阻塞就会导致高并发问题,导致下一个消息不能被消费)
调优思路:已知这几个条件可以调优
1.消息是不是持久化持久化之后没有被消费,是不是要过期?如果不过期那么一直就在内存里堆积,直到重启broker才会消失,那么想要标识一下,要么手动清理,要么放在死信队列里面;当放入死信队列(持久化的消息才会死信队列,非持久化消息需要改配置,改完配置才会进入死信队列,也可以直接把死信队列关了来提升整体服务器的性能)
topic是不持久化,队列是持久化,消息会进死信队列
独占消费者:就是一个consumer一直在某一个provider上消费数据
mysql journal给提供的便利
Mq持久化是先往文件缓冲区里写(速度快因为是独占这一个空间只写),然后再往mysql里写(可以配置多久之后往mysql中写),消费的消息先从文件中消费,消费过后再从文件中剔除掉;剔除掉之后到规定时间再放到mysql中(在这个时间窗口内,被消费了就不在写入mysql)
Springboot使用:
JmsMessagingTemplate是JmsTemplate的包装使用
接收者写法
发送者写法
active如何防止消息丢失?会不会丢消息?
(一方面是windowSize满了之后就不能再放进去了-如何避免需要组建高可用集群)
(高性能-消息分片 不同消息放在不同的队列里,高可用-主从节点)
死信队列、ReplatTo、持久化(能做尽量做除非要求极高的性能)、ack、消息重投、记录日志、接受(消费)确认、broker负载/限流、独占消费者、通过tempDestination同步消息
如何防止重复消费?
map concurrentHashMap->putIfAbsent guava cache(多了个超时机制)
如何保证顺序消费?
queue(优先级不要设置) 多消费端->
集群式消费点对点模式
broker对group1里的client1和group2里的client1 client2 发送消息
保持顺序消费的原理:
消息队列支持FIFO原则;1、同一topic(用一群queue组成,底层的物理结构是queue->相当于无限长的数组 ) 2、同一queue(QUEUE才是保证了先进先出)3、发消息的时候同一线程去发送消息4、消费的时候一个线程,消费一个queue里的消息5、多个queue之恩那个保证单个queue里的顺序
保证同一queue
保证同一线程 DefaultMsg默认同一线程,transationMsg可以设置线程池
保证同一线程消费
对于接受的数据做过滤,需要在header里setProperty
生产环境上链接方式的不同对性能有很大的影响;nio是基于tcp的连接协议(提升服务器的性能,并发连接数提升) auto+nio(auto代表包含了另外的四种协议)
推送消息的前提条件是1.设置了preFetchSize 2.consumer没有满
消息堆积的意义,consumer会在内存那里有一个缓冲区
如果 preFetchsize里面有很多消息没有消费完,那么就是堆积
broker会记录每一个consumer的pSize(是consumer把pSize传给broker),broker在推消息的时候就会判断consumer的pSize是否等于0,等于0结束不推消息;如果不等于0判断是否满了,满了不推送;如果满足条件consumerPsize执行加一,然后把消息推送给consumer;consumer在自己的缓冲区多一条消息;这条消息是unconsumer;当consumer启动消费线程的时候,会从缓冲区消费消息;消费完成后再次向broker发送ack;然后broker到consumerPSize里进行减一操作;如果有持久化就删除,内存没推的消息减一
提供了异步 缺点I/O瓶颈;一个p等待c回复就会有一个临时节点;就会出现临时节点过多的问题,会把broker内存撑爆(如果有1w个p和c就会有1w个临时节点)
topic加强可追溯消息:
topic配置可以保存最后几条 (在consumer没有在线的时候,后续上线也可以消费消息)
prefetchSize消费倾斜:多个consumer但其中一个把所有的消息都消费了
解决办法:满速效肥的时候可以吧prefetchSize设为1,每次取一条
prefetchSize造成消费者内存溢出
AUTO_ACKNOWLEDGE造成消息丢失/乱序:
消息消费失败后,无法复原消息,可以手动ack避免broker把消息自动确认删除
receive()方法接受到消息后立即确认
listener的onmessage方法执行完毕才会确认
手动ack的时候要等connetion断开才会重新推送给其他的consumer,所以有可能会导致消费顺序混乱
exclusive和selector有可能造成消息堆积(设置排他,消息消费不完,不会把消息发给其他的consumer,broker就会堆积消息,堆积到一定程度,produce就不能再提交消息了)
activeMQ集群:
1 主备切换 --性能没有提高 提高了可用性
service1启动成功机会上锁;其他的service智能retry;当service1宕机,其他服务会连接上
2 共用数据源 --性能没有提高 提高了可用性
网络桥接方式两个broker之间通信来互通数据(这种方式不适合大数据量吞吐,会有网络IO的瓶颈,多数是在topic的模式下,就是拉topic,对多个client广播,节约资源)
动态网络协议:通过udp来发现新加入的broker 动态扩容
RocketMQ rocketMq 事务消息:2pc 两阶段提交 rocketMq使用的是2pc;最典型的方案就是数据库的事物(特点);需要b的一方实现xa协议
1、尝试提交(不会真的提交会维护一个状态,不可用) 2、确认ok (确认之后状态可用)
tcc (try confirm cancel)三阶段提交分为3步
1、发送数据就是try b开始lock数据 2、a执行本地事物当conmit的时候,confirm数据,释放b这个锁 3、如果失败就cancal方法(try confirm cancal都有b实现)
1、首先发送方要开启事务
2、发送消息这一端要发送消息(这个时候并不是真的要发送消息 HFMessage(一般消息发出去)或者叫做preparaMessage)
3、发送消息给broker,broker接到hfMessage后(其实主要标识一下这个produce发送事物了)
4、broker接到这个消息后会写到专门的消息队列 hf
5、写进去之后,同步或者异步保存到磁盘(那么这条消息就算是收到了)
6、把确认信号给producer(保证发送消息也是可靠的)
7、这个时候就可以真正的可以执行本地事务了
之前在send方法的时候,这个broker会返回信号
如果rollback就把HFM队列的消息撤销掉,commit的话就把Hf队列消息撤销掉,把真正的消息发送出去
8、为了保证执行本地任务时间过长的问题,broker会开启一个定时任务(跑的就是Hf队列里的半消息)
9、取到事务相关信息,检查7有没有完成
10、produce会发起一个回调方法,按照本地逻辑回馈 成功或者失败 (如果连接不上回调方法重试几次后就会剔除这个事务,producer就会重新发这个消息)
11、consumer直接订阅消息
执行过的消息会存在系统磁盘上,一定时间后会清除消息
tcc一般是框架,需要中间件支持
2pc一般是基于数据库或者第三方的中间件比如说redis,一些mq或者说数据库这些
- Broker面向producer和consumer接收和发送消息
- 向nameserver提交自己的信息
- 是消息中间件的消息存储、转发服务器
- 每个broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后上报(broker中的master和slave,master只支持写发,slave只支持发,p只能连接master,c可以连接master和slave)
(producer想要发送到某一个broker上,会先去找nameserver的topic,发送到某一个topic上,topic知道是哪个broker,consumer同理;broker可以动态的上下线,namespace是无状态的)
- Broker高可用,可以配成Master/Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave
- 一个Master可以对应多个slave,但是一个Slave只能对应一个Master
- Master与Slave的对应关系通过指定相同的brokerName,不同的BrokerId来定义BrokerId为0表示Master,非0表示Slave
- Master多机负载,可以部署多个broker
- 每个broker与namespace集群中的所有节点建立长连接,定时注册Topic信息到所有的namesever
- 消息的生产者
- 通过集群中的其中一个节点(随机选择)建立长连接,获取Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
- 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳
消息的消费者,通过namespace集群获取Topic的路由信息,连接到对应的Broker上消费消息
注意,由于Master和salve都可以读取消息,因此Consumer会与Master和Slave都建立连接
底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点
namespace是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时向namespace上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表剔除
**nameserver可以部署多个,**当多个namespace存在的时候,其他角色同时向他们上报消息,以保证高可用
**namespace集群间互不通信,**没有主备概念
为什么不用zookeeper?:rockerMq希望提高性能,CAP定理,客户端负载均衡
Topic是一个逻辑上的概念,实际上Message是每个Broker上以及Queue的形式记录
(和activeMq做对比,activeMq中遵循JMS标准,有两种消息类型一种是Topic一种是Queue,topic相当于广播,可以被多个人消费,queue只能被单个消费者消费;在RocketMq中没有topic的概念,只有一种Queue,Queue在rocketmq中可以广播,广播在客户端设置,通过客户端设置这条消息的属性,消费者来设置; 一个topic中可以包含多个queue,topic中queue太多的话,可以存在多个broker上,这时候就是对于一个broker进行分片处理;topic在使用的时候只是逻辑上存在,在使用的时候其实就是消息队列,队列的名字是什么,往这个队列里面写数据,topic和queue相当于包含关系)
可以多条消息打包一起发送,减少网络传输次数提高效率
- 批量消息要求必要具有同一topic,相同消息配置
- 不支持延时消息
- 建议一个批量消息最好不要超过1MB大小
- 如果不确定是否超过限制,可以手动计算大小分批发送
集群消息是指集群化部署消费者
当使用集群消费模式时,MQ认为任意一条消息只需要被集群内的任意一个消费者处理即可
特点
- 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
- 在消息重投时,不能保证路由到同一台机器上
- 消费状态由broker维护
当使用广播消费模式时,MQ会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次
特点
- 消费进度由consumer维护
- 保证每个消费者消费一次消息
- 消费失败的消息不回重投
消息过滤,根据Tag来过滤,根据key来做模糊查找(tag selector在一个group中的消费者,都不能随便变,要保持统一;某一个组刚刚关注了tag-a 又换了关注了tag-b就会混乱接收不到消息;但是可以事同一个topic)
组:
设置tag
每个broker中的queue在收到消息时会记录offset,初始值为0,每记录一条消息offset会递增+1
minOffset最小值
maxOffset大值
consumerOffset消费者消费进度/位置
diffTotal消费积压/未被消费的消息数值
消费者 DefaultMQPushConsumer与DefaultMQPullConsumer在消费端,我们可以视情况来控制消费过程
DefaultMQPushConsumer由系统自动控制过程
- 消息的消费者new出来一个group DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (''consumer01")
- 然后set地址连接 consumer.setNamesrvAddr(“ip:端口”);
- 然后订阅 consumer.subscribe(“myTopic01”)
- 然后registerMessageListener(new Listener(){回调})
- 启动 consumer.start
start后客户端启动成功了,消息会放在回调里,messageListener会逐步的消费消息
rocketMq伪代码整体启动流程
DefaultMQPullConsumer大部分功能需要手动控制 去掉了换成了DefaultMQLiteConsumer
集群消息的消息负载均衡在集群消费模式下(clustering)
相同的group中的每个消费者只消费topic中的一部分内容
group中所有消费者都参与消费过程,每个消费者消费的内容不重复,从而达到负载均衡的效果
使用DefaultMQPushConsumer,新启动的消费者自动参与负载均衡
消息处理类拉取回来的消息,也可认为是消息缓冲区
长轮询Consumer->Broker RocketMq才用的长轮询建立连接
- consumer的处理能力Broker不知道
- 直接推送消息broker端压力较大,需要维护consumer的状态
- 采用长链接有可能consumer不能及时处理推送过来的数据
- pull主动权在consumer手里
(RabbitMQ无论是pull还是push底层用的都是pull这说明了一个问题,RabbitMQ的网络模型是长轮询-介于长连接和短轮询之间.)
client不断发送请求到server,每次都需要重新连接
比较常见的事 http协议
client不断发送请求到server,server有数据返回,没有数据请求挂起不断开连接
好处:client掌握了主动权,不会导致消息重投
连接一旦建立,永远不断开,push方式推送
好处:没有延迟
坏处:server这一端要维护client的状态,对于服务器来说性能开销要高于轮询;还有一个问题服务器不知道client的消费速度,盲目的消息推过去导致消息堆积,消息堆积导致的后果就是c1堆积消息了,分配给c1的消息c2消费不到,服务器拿不到该消息的状态就会还往c1推送消息,导致消息很久得不到消费消息会退还给server造成消息重投
相当于websocket协议
源码解析processQueue
真实拉取数据的转换和协议
客户端向服务器连接发心跳包
客户端分发排序配置
pullMessageService拉取消息
发心跳包,然后设置间隔时间;然后client启动流程,开启schedule定时器(获取nameserver地址,定时更新路由信息,清除无效broker,向broker发送心跳包保证连接状态,持久化消息);消息拉取;消息填充
rebalanceServer负载消息
RocketMQ使用文件系统持久化消息,性能要比使用DB产品要高
文件写入速度顺序读写:3G左右 随机读写:2G
数据零拷贝技术很多实用文件系统存储的高性能中间件都是使用了零拷贝技术来发送文件数据,比如Nginx
rocketMQ想要拿一份数据
- 先把请求发送给内核
- 内核到磁盘找到数据
- 找到的数据复制一份加载到内核
- 再复制一份发给Mq
想要发出去 - 再复制一份交给网卡驱动,由内核提交
启动了数据0拷贝,只需要一个信号给内核,内核拿到数据直接推到网卡上
复制在内存里,数据量比较大占用数据总线时间很长
- MappedByteBuffer使用虚拟内存,因此分配(map)的内存大小不受JVM的-Xmx参数限制,但是也是有大小限制的
- 如果当文件超出1.5G限制时,可以通过position参数重新map文件后面的内容
- MappedByteBuffer在处理大文件时的性能的确很高,但也存在一些问题,如内存占用、文件关闭不确定,被其打开的文件只有在垃圾回收的时候才会被关闭,而且这个时间点是不确定的
javadoc中也提到:A mapped byte and the file mapping that it represents remain* valid until the buffer itself is garbage-collectes
所以为了使用零拷贝技术,RocketMQ的文件存储大小默认每个1g,超过1g会重新建立一个新文件
存储消息的详细内容,按照消息收到的顺序,所有消息都存储在一起.每个消息存储后都会有一个offset,代表在commitLog中的偏移量
默认配置 MessageStoreConfig
核心方法
- putMessage写入消息
- MappedFileQueue->MappedFile
MappedFile 默认1G
通过消息偏移量建立的消息索引
针对每个Topic创建,消费逻辑队列,存储位置信息,用来快速定位CommitLog中的数据位置
启动后会被加载到内存中,加快查找消息速度
消息被broker写入磁盘后再给producer响应
异步刷盘消息被broker写入内存后立即给producer响应,当内存中消息堆积到一定程度的时候写入磁盘持久化
配置选项1、没状态 2、高可用 3、高性能 4、数据会有不一致
NameServer采用的是broker定时向所有的NameServer发送心跳包,容错信息放在客户端来做(注册中心简化了,客户端就复杂了)
producer发送消息正常,broker之间发送信息异常,nameserver就要规避这样的问题,就会把这个broker踢掉,来维护本地的topic信息,那么producer取到的topic信息就是准确的。
对于已订阅的consumer来说剔除掉了一个broker,那么nameserver里的信息没有了,那么就得不到正确的broker地址,怎么解决?
一次订阅会定时拉取
1和broker建立连接 2和producer和consumer建立连接
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
网页名称:RocketMq的理解和记录-创新互联
文章来源:http://pwwzsj.com/article/dhipdh.html