如何分析KafkaConsumer
这期内容当中小编将会给大家带来有关如何分析Kafka Consumer,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
创新互联公司是一家集网站建设,博白企业网站建设,博白品牌网站建设,网站定制,博白网站建设报价,网络营销,网络优化,博白网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。
Kafka Consumer消费以组的方式划分,Topic中的每一个分区只会分给同一个组中的其中一个实例。这是基于队列模式,如果想基于发布订阅模式,那订阅同一个Topic的实例需要指定不同的组名。
必需参数
bootstrap.servers
Kafka服务器
group.id
Consumer Group的名字,唯一标识一个consumer group
key.deserializer
Key的反序列化,二进制的消息Key转换成具体的类型
value.desrializer
Value的反序列化,二进制的消息内容转换成具体的类型
主要参数
session.timeout.ms
coordinator检测失败的时间,通常需要设置一个较小的值,这样可以快速检测到consumer崩溃的情况,尽快开启rebalance。
max.poll.interval.ms
用于设置消息处理逻辑的最大时间
auto.offset.reset
consumer group无位移信息和位移越界时Kafka对应的策略。consumer group重启不会使用该策略,因为Kafka已经记录了group的唯一信息
earliest:从最早的位移开始消费,不一定就是0
latest:从最新位移处开始消费
none:如果无位移信息和位移越界,抛出异常。
enable.auto.commit
指定consumer是否自动提交位移,默认为true
fetch.max.bytes
指定consumer单次获取数据的最大字节数
max.poll.records
控制poll方法返回的最大消息数量
heartbeat.interval.ms
控制consumer group中成员感知rebalance的时间。
connections.max.idle.ms
空闲连接空闲时间超过该参数,会被关闭。
auto.commit.interval.ms
后台自动提交位移的时间间隔
消息轮询Poll
新版Consumer采用了类似Linux I/O模型Poll,使用一个线程管理多个socket连接,然后循环Poll消息。
poll方法返回的条件是要不获得了足够多的数据,或者超过了指定的超时时间。
位移管理
新版本的consumer位移已交由内部topic管理(_consumeroffsets),该Topic有多个分区,每个分区有多个副本(可以通过参数控制)。该内部Topic存在的唯一目的保存consumer提交的位移。
手动提交位移支持同步和异步,提交需要位移需要指定一个Map,key是TopicPartition,value是OffsetAndMetadata,里面存储了下一条待消费消息的offset。
上述就是小编为大家分享的如何分析Kafka Consumer了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。
新闻名称:如何分析KafkaConsumer
链接分享:http://pwwzsj.com/article/pcppcp.html