Kafka重复消费的问题

背景

消费Kafka数据采用的是kafka-python这个包,原消费代码与官网示例一样。但是在生产环境中使用的时候,发现数据有一些莫名其妙的重复,1000条数据里面,完全去重之后大概有600条,这样的重复率算很高的了。于是进一步排查定位,为什么会出现重复消费的问题。

原消费代码

1
2
3
4
5
6
7
8
9
10
11
12
# 示例代码
from kafka import KafkaConsumer

topic = "test"
groupid = "qimingyu"
clientid = "0"
server_list = ["127.0.0.1:9092"]
consumer = KafkaConsumer(topic, group_id=groupid,
client_id=clientid,
bootstrap_servers=server_list)
for message in consumer:
print message.partition, message.offset

这是一个多进程的消费程序,假设所消费的topic分区数目是n,则起n个这样的进程,分别指定不同的clientid。理想情况下,一个进程会对应一个分区进行消费,这是由Kafka自带的reblance(消费者平衡)机制决定的。

根源分析

但是就是这样的代码,在生产环境中使用的时候,出现了数据被大量重复消费的情况。为什么会有这种情况?

首先出于性能考虑,该段代码采用的是自动提交offset,并且采用的是默认时间,每隔5秒,由消费者进程提交一次offset。

当多个进程采用一个groupid消费同一个topic的过程中,Kafka会有内部的reblance机制,也就是再平衡机制,这个机制设计的初衷是为了当有新的消费者进程加入或者旧的消费者进程退出的时候,Kafka会自动重新平衡分区与消费者进程的关系。因此Kafka内部有一个心跳机制进行检测消费者进程与分区的状态,当超过一定的时间,消费者进程没有响应的时候,就会触发这个reblance机制。

因此假设进程A正在消费分区1的信息,并提交了偏移量,之后又消费了10条数据,还没来得及提交偏移量的时候,reblance机制让进程B来继续消费分区1的信息,此时进程B会从上次进程A提交偏移量的地方开始消费,因此这10条数据就是重复消费的。

当reblance比较频繁的时候,就会造成大量数据的重复。

解决方式

将消费者进程与分区静态绑定,这样Kafka不会采用自动reblance机制,就不会重复消费数据。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 修改后的示例代码
from kafka import KafkaConsumer
from kafka import TopicPartition

topic = "test"
server_list = ["127.0.0.1:9092"]
groupid = "qimingyu"
clientid = "0"

consumer = KafkaConsumer(group_id=groupid,
client_id=clientid,
bootstrap_servers=server_list,
value_deserializer=valuedeserializer)
partition = TopicPartition(topic, int(clientid))
consumer.assign([partition])

for message in consumer:
print message.partition, message.offset

该段代码将分区与消费者进程一一绑定,取消了动态的reblance机制,因此这种情况下,不会再出现重复消费的问题。

测试

重新改写代码,运行24小时,经过日志检查,确实不再有重复消费的数据。经过对比,有几个进程依然采用原来的方式进行消费的,查看日志有900多条数据重复。说明这个方法能够解决Kafka中重复消费的问题。


【版权声明】
本文首发于戚名钰的博客,欢迎转载,但是必须保留本文的署名戚名钰(包含链接)。如您有任何商业合作或者授权方面的协商,请给我留言:qimingyu.security@foxmail.com
欢迎关注我的微信公众号:科技锐新

kejiruixin

本文永久链接:http://qimingyu.github.io/2018/06/30/Kafka重复消费的问题/

坚持原创技术分享,您的支持将鼓励我继续创作!

热评文章