卡夫卡消费者名单

我得想办法问卡夫卡要一份主题清单。我知道我可以使用包含在 bin\目录中的 kafka-topics.sh脚本来完成这项工作。一旦我有了这个列表,我需要每个主题的所有消费者。我在那个目录中找不到脚本,在 kafka-consumer-api库中也找不到允许我这样做的类。

这背后的原因是,我需要找出主题的偏移量和消费者的偏移量之间的差异。

有没有实现这一点的方法? 或者我需要在每个消费者中实现这一功能?

210111 次浏览

High level consumers are registered into Zookeeper, so you can fetch a list from ZK, similarly to the way kafka-topics.sh fetches the list of topics. I don't think there's a way to collect all consumers; any application sending in a few consume requests is actually a "consumer", and you cannot tell whether they are done already.

On the consumer side, there's a JMX metric exposed to monitor the lag. Also, there is Burrow for lag monitoring.

Kafka stores all the information in zookeeper. You can see all the topic related information under brokers->topics. If you wish to get all the topics programmatically you can do that using Zookeeper API.

It is explained in detail in below links Tutorialspoint, Zookeeper Programmer guide

Use kafka-consumer-groups.sh

For example

bin/kafka-consumer-groups.sh  --list --bootstrap-server localhost:9092


bin/kafka-consumer-groups.sh --describe --group mygroup --bootstrap-server localhost:9092

you can use this for 0.9.0.0. version kafka

./kafka-consumer-groups.sh --list --zookeeper hostname:potnumber

to view the groups you have created. This will display all the consumer group names.

 ./kafka-consumer-groups.sh --describe --zookeeper hostname:potnumber  --describe  --group consumer_group_name

To view the details

GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER

I realize that this question is nearly 4 years old now. Much has changed in Kafka since then. This is mentioned above, but only in small print, so I write this for users who stumble over this question as late as I did.

  1. Offsets by default are now stored in a Kafka Topic (not in Zookeeper any more), see Offsets stored in Zookeeper or Kafka?
  2. There's a kafka-consumer-groups utility which returns all the information, including the offset of the topic and partition, of the consumer, and even the lag (Remark: When you ask for the topic's offset, I assume that you mean the offsets of the partitions of the topic). In my Kafka 2.0 test cluster:
kafka-consumer-groups --bootstrap-server kafka:9092 --describe
--group console-consumer-69763 Consumer group 'console-consumer-69763' has no active members.


TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
pytest          0          5               6               1               -               -               -
``




All the consumers per topic

(Replace --zookeeper with --bootstrap-server to get groups stored by newer Kafka clients)

Get all consumers-per-topic as a table of topictabconsumer:

for t in `kafka-consumer-groups.sh --zookeeper <HOST>:2181 --list 2>/dev/null`; do
echo $t | xargs -I {} sh -c "kafka-consumer-groups.sh --zookeeper <HOST>:2181 --describe --group {} 2>/dev/null | grep ^{} | awk '{print \$2\"\t\"\$1}' "
done > topic-consumer.txt

Make this pairs unique:

cat topic-consumer.txt | sort -u > topic-consumer-u.txt

Get the desired one:

less topic-consumer-u.txt | grep -i <TOPIC>

I do not see it mentioned here, but a command that i use often and that helps me to have a bird's eye view on all groups, topics, partitions, offsets, lags, consumers, etc

kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --all-groups

A sample would look like this:

GROUP TOPIC PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID HOST CLIENT-ID
Group Topic 2          7               7               0   <SOME-ID>   XXXX <SOME-ID>
:
:

The most important column is the LAG, where for a healthy platform, ideally it should be 0(or nearer to 0 or a low number for high throughput) - at all times. So make sure you monitor it!!! ;-).

P.S:
An interesting article on how you can monitor the lag can be found here.

You can also use kafkactl for this:

# get all consumer groups (output as yaml)
kafkactl get consumer-groups -o yaml


# get only consumer groups assigned to a single topic (output as table)
kafkactl get consumer-groups --topic topic-a

Sample output (e.g. as yaml):

name: my-group
protocoltype: consumer
topics:
- topic-a
- topic-b
- topic-c

Disclaimer: I am contributor to this project