是什么决定了Kafka的消费抵消?

我对卡夫卡还比较陌生。我已经做了一些试验,但一些事情是不清楚的,我对消费者抵消。根据我到目前为止的理解,当一个消费者开始时,它将开始读取的偏移量由配置设置auto.offset.reset决定(如果我错了请纠正我)。

现在假设主题中有10条消息(偏移量从0到9),一个消费者碰巧在它关闭之前(或者在我杀死消费者之前)使用了其中的5条消息。然后重新启动该使用者进程。我的问题是:

  1. 如果auto.offset.reset被设置为earliest,它总是从偏移量0开始消费吗?

  2. 如果auto.offset.reset被设置为latest,它是否会从偏移量5开始消费?

  3. 这种情况下的行为总是确定性的吗?

如果我的问题有不清楚的地方,请不要犹豫评论。

147953 次浏览
这比你描述的要复杂一点。
auto.offset.reset配置只在你的消费组没有一个有效的偏移量提交到某个地方时才会起作用(现在支持的2个偏移量存储是Kafka和Zookeeper),这也取决于你使用什么类型的消费者

如果你使用高级java消费者,那么想象以下场景:

  1. 你在消费组group1中有一个消费者,它已经消费了5条消息并且已经死亡。下次你启动这个消费者时,它甚至不会使用auto.offset.reset配置,而是从它死亡的地方继续,因为它只会从偏移存储(我提到的Kafka或ZK)中获取存储的偏移。

  2. 你在一个主题中有消息(就像你描述的那样),你在一个新的消费组group2中开始一个消费者。没有偏移量存储在任何地方,这一次auto.offset.reset配置将决定是从主题的开始(earliest)还是从主题的结束(latest)开始。

影响偏移值对应earliestlatest配置的另一个因素是日志保留策略。假设您有一个主题,保留时间配置为1小时。你产生5条消息,然后一个小时后你再发布5条消息。latest偏移量仍然和前面的例子一样,但是earliest偏移量将不能是0,因为Kafka已经删除了这些消息,因此最早可用的偏移量将是5

上面提到的一切都与SimpleConsumer无关,每次你运行它时,它将决定从哪里开始使用auto.offset.reset配置。

如果你使用的Kafka版本高于0.9,你必须将earliestlatest替换为smallestlargest

只是一个更新:从Kafka 0.9开始,Kafka使用了一个新的Java版本的消费者,并且auto.offset.reset参数名已经更改;摘自手册:

当Kafka中没有初始偏移量时该怎么办 偏移量不再存在于服务器上(例如,因为该数据 已删除):

最早的:自动将偏移量重置为最早的偏移量

最新的:自动将偏移量重置为最新偏移量

没有一个:如果没有找到之前的偏移量,则向使用者抛出异常 对于消费者组

其他情况:向使用者抛出异常。

在检查了被接受的答案后,我花了一些时间找到了这个答案,所以我认为它可能对社区发布有用。

此外,还有补偿。留存。分钟。如果距离上次提交的时间为> offsets.retention.minutes,则auto.offset.reset也会生效