在 ApacheKafka 中,为什么不能有比分区更多的使用者实例呢?

我正在学习卡夫卡,阅读这里的介绍部分

Https://kafka.apache.org/documentation.html#introduction

特别是关于消费者的部分。在导言的倒数第二段中,它读到

卡夫卡在这方面做得更好。通过在主题中引入平行性(即划分)的概念,卡夫卡是 能够在使用者进程池中提供订单保证和负载平衡 通过将主题中的分区分配给使用者组中的使用者来实现 每个分区只被组中的一个消费者使用。通过这样做,我们确保 Consumer 是该分区的唯一读取器,并按顺序使用数据 这仍然平衡了许多使用者实例的负载 使用者实例多于分区。

我的困惑源于最后一句话,因为在作者描述了两个消费者组和一个4分区主题的段落正上方的图像中,消费者实例比分区多!

不能有比分区更多的使用者实例也是没有道理的,因为那样的话分区就会非常小,而且似乎为每个使用者实例创建新分区的开销会使卡夫卡陷入困境。我理解分区用于容错和减少任何一台服务器上的负载,但是上面的句子在一个分布式系统的上下文中是没有意义的,因为分布式系统应该能够同时处理成千上万的使用者。

73229 次浏览

Ok, to understand it, one needs to understand several parts.

  1. In order to provide ordering total order, the message can be sent only to one consumer. Otherwise it would be extremely inefficient, because it would need to wait for all consumers to recieve the message before sending the next one:

However, although the server hands out messages in order, the messages are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the messages is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of "exclusive consumer" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing.

Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances than partitions.

Kafka only provides a total order over messages within a partition, not between different partitions in a topic.

Also what you think is a performance penalty (multiple partitions) is actually a performance gain, as Kafka can perform actions of different partitions completely in parallel, while waiting for other partitions to finish.

  1. The picture show different consumer groups, but the limitation of maximum one consumer per partition is only within a group. You still can have multiple consumer groups.

In the beginning the two scenarios are described:

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

So, the more subscriber groups you have, the lower the performance is, as kafka needs to replicate the messages to all those groups and guarantee the total order.

On the other hand, the less group, and more partitions you have the more you gain from parallizing the message processing.

There is a reason why Kafka can not support multiple consumers per partition.

Kafka broker writes data to the file per partition. So let's say if two partitions are configured, broker will create two files and assign multiple consumer groups where messages can be sent.

Now for each partition, only one consumer consumes messages based on the offset of the file. e.g Consumer 1 will first read messages from file offset 0 to 4096. Now these offset are part of the payload so consumer will know which offset to use while requesting for next messages read.

If multiple consumers are reading from same partition then consumer 1 reads from file with offset 0-4096 but consumer 2 will still try to read from offset 0 unless it also receives message sent to consumer 1. Now if same messages are sent to multiple consumers than it is not a load balancing so Kafka has divided them into consumer groups so all consumer groups can receives messages but within consumer group, only one consumer can receive message.

It is important to recall that Kafka keeps one offset per [consumer-group, topic, partition]. That is the reason.

I guess the sentence

Note however that there cannot be more consumer instances than partitions.

is referring to the "automatic consumer group re-balance" mode, the default consumer mode when you just subscribe() some number of consumers to a list of topics.

I assume that because, at least with Kafka 0.9.x, nothing prevents having several consumer instances, members of the same group, reading from the same partition.

You can do something like this in two or more different threads

Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "MyConsumerGroup");
props.put("enable.auto.commit", "false");
consumer = new KafkaConsumer<>(props);
TopicPartition partition0 = new TopicPartition("mytopic", 0);
consumer.assign(Arrays.asList(partition0));
ConsumerRecords<Integer, String> records = consumer.poll(1000);

and you will have two (or more) consumers reading from the same partition.

Now, the "issue" is that both consumers will be sharing the same offset, you don't have other option since there is only one group, topic and partition into play.

If both consumers read the current offset at the same time, then both of them will read the same value, and both of them will get the same messages.

If you want each consumer to read different messages you will have to sync them so only one can fetch and commit the offset at at time.

In Kafka, only one consumer instance can consume messages from a partition. If consumer instances are more than partitions, then there will be no use of extra consumer instances. So kafka don't allow these extra consumer instances.

Now, If multiple consumers can consume partition then there would not be any ordering in consumption of messages. This is the reason why kafka don't allow multiple consumers per partition

Kafka consumer group model is a hybrid of queuing mechanism where the message once read by one consumer instance is deleted from the queue right away and pub/sub mechanism where the message is not deleted until the retention period set or until it expires and is available to all consumer instances until expiry. So, if you have use case where you want to use which is a pub/sub model but want to leverage it as a queuing mechanism you create consumer group for all your consumer instances. Given Kafka distributes partitions among the consumer instances within a single consumer group it is guaranteed that 1 message is only processed once. If Kafka allows you to have more consumer instances within a single consumer group then it beats the purpose of having the consumer group.

Consider this example:

REST API pub1 published 4 messages to topic1 that has 4 partitions part1 thru part4, so each part has 1 message.

You have 2 microservices sub1 and sub2 as subscribers and there are 4 instances of each microservices running.

Now if you create 2 consumer groups, one for each miroservice sub1instance1 will be mapped to part1, sub1instance2 will be mapped to part2, etc Similarly sub2instance1 will be mapped to part1, sub2instance2 mapped to part2 etc.

As long as your consumer instances within each consumer group are less than or equal to the number of partitions, each instance of your microservice is going to process the message only once. In this case sub1instance1 and sub2instance will process msg1 from part1.

If there're more consumer instances than partitions then Kafka will have to assign same partitions to multiple consumer instances so messages will be processed multiple times by each consumer instance mapped to that partition. This is the reason why Kafka prevents us from having more consumer instances within a consumer group than number of partitions within a topic that consumer group is subscribed to.

Hope this makes sense.

Let's think it this way, we know that a consumer group can subscribe itself to multiple topics right? Here, we can also assume that each topic that it has subscribed to has different no. of partitions, it is possible right?

Now, in this case the no. of instances EQUAL TO no. of partitions rule can't be applied w.r.t to all the topics as each topic assumed to have different no. of partitions, right ? Therefore, for same consumer-group we will have i == p for some topic, i < p for some topic and i > p for some topic.

In other words, ideally you would want to at least have no. of instances in the consumer group EQUAL TO no. of partition in the topic but if you end up having more instances then it won't fail or cause harm i.e. for that topic the extra instances will simply remain idle.

Example:

  • topic A with 2 partitions

  • topic B with 3 partitions

  • consumer group with 3 instances

      A[1 2]   B[1 2 3]
    
    
    [x y z] (consumer group)
    

Now, for topic 'B' all 3 consumer instances will be active (reading from 1 partition each), however, for topic 'A' only any 2 out of 3 consumer instances will be active (i.e. 1 of them will be idle as topic has just 2 partitions).