kafka消息丢失怎么办

本篇内容主要讲解“kafka消息丢失怎么办”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“kafka消息丢失怎么办”吧!

创新互联建站坚持“要么做到,要么别承诺”的工作理念,服务领域包括:成都做网站、网站建设、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的个旧网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!

在处理生产环境问题的过程中发现12号那天某kafka集群有少量的数据丢失,概率大致在千万分之三。 数据写入kafka之后,就完全消失了,消费者完全没有消费到这个数据。 通过找到那天的数据,查看有问题的数据在写入kafka的时候上下文应用日志发现有少量以下报错:

[2019-10-12 11:03:43,xxx] This is not the correct coordinator.

理论上正常情况下kafka是不太可能丢数据的,如果出现这种情况,必然是开发人员或者硬件引发了什么问题,因为写入日志是有的,看了下应用配置

acks=1

马上意识到,问题突破口应该在这里。

acks=0 生产者能够通过网络吧消息发送出去,那么就认为消息已成功写入Kafka,一定会丢失一些数据
acks=1 master在疏导消息并把它写到分区数据问津是会返回确认或者错误响应,还是可能会丢数据
acks=all master在返回确认或错误响应之前,会等待所有同步副本都收到消息

可能以前是为了保证性能够快,选择了折中的应用配置 acks=1

马上想到去看下kafka的日志,猜测这个时间段必然出现出现了 master 不可用的情况才会导致数据丢失。

在kafka集群的57号节点的机器上看到这样一段日志:

[2019-10-12 11:03:39,427] WARN Client session timed out, have not heard from server in 4034ms for sessionid 0x396aaaadbbxx00 (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:40,908] INFO Client session timed out, have not heard from server in 4034ms for sessionid 0x396aaaabbxx00, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:41,253] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient)
[2019-10-12 11:03:41,962] INFO Opening socket connection to server xx.xx.xx.59/xx.xx.xx.59:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:41,962] INFO Socket connection established to xx.xx.xx.59/10.xx.xx.xx:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:42,293] WARN Unable to reconnect to ZooKeeper service, session 0x396cf664cdbb0000 has expired (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:42,293] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient)
[2019-10-12 11:03:42,294] INFO Initiating client connection, connectString=xx.xx.xx.55:2181,xx.xx.xx.56:2181,xx.xx.xx.57:2181,xx.xx.xx.58:2181,xx.xx.xx.59:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@342xxx2d (org.apache.zookeeper.ZooKeeper)
[2019-10-12 11:03:42,313] INFO Unable to reconnect to ZooKeeper service, session 0x396cxxxxxb0000 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:42,323] INFO EventThread shut down for session: 0x396cxxxx000 (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:42,342] INFO Opening socket connection to server xx.xx.xx.58/xx.xx.xx.58:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:42,343] INFO Socket connection established to xx.xx.xx.58/xx.xx.xx.58:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:43,516] INFO Session establishment complete on server xx.xx.xx.58/xx.xx.xx.58:2181, sessionid = 0x3ax4xxxxx01, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-10-12 11:03:43,517] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)

看来和猜测的情况是一样的,这个kafka的节点机器出现了连接不到zk,所以这台机器处于‘丢失’状态。又去看了下kafka的controller日志:

[2019-10-12 11:03:42,671] INFO [Controller id=56] Newly added brokers: , deleted brokers: 57, all live brokers: ...,55,56,58,59,xx.... (kafka.controller.KafkaController)
[2019-10-12 11:03:42,678] INFO [Controller-56-to-broker-57-send-thread]: Shutting down (kafka.controller.RequestSendThread)
[2019-10-12 11:03:42,749] INFO [Controller-56-to-broker-57-send-thread]: Stopped (kafka.controller.RequestSendThread)
[2019-10-12 11:03:42,749] INFO [Controller-56-to-broker-57-send-thread]: Shutdown completed (kafka.controller.RequestSendThread)
[2019-10-12 11:03:42,828] INFO [Controller id=56] Broker failure callback for 57 (kafka.controller.KafkaController)
[2019-10-12 11:03:42,836] INFO [Controller id=56] Removed ArrayBuffer() from list of shutting down brokers. (kafka.controller.KafkaController)

再次看了下这个节点系统的网络层面监控:

kafka消息丢失怎么办

真相浮出水面,和猜测吻合,这个问题和之前遇到的一个问题有点类似,不一样的情况是这次只丢失了一个节点,上次的问题是所有的网络节点都短暂的丢失,详情可以看我之前写的博客: https://my.oschina.net/110NotFound/blog/3105190

这次的原因分析:

57号节点丢失之后,立马进行了选举,这个时候数据在生产的时候到达了master,恰好选举的时候变更了master,而 acks=1 ,数据已经进入master,但是还没有来得及同步到slave,这样就导致了数据的丢失。

总结:

如果有对数据有强一致性的要求,一定要选择 acks=all 否则指不定哪天硬件的轻微系统抖动就会导致 kafka 集群重新选举,丢失数据。

到此,相信大家对“kafka消息丢失怎么办”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!


本文标题:kafka消息丢失怎么办
网址分享:http://pwwzsj.com/article/joheeg.html