Java, How to get number of messages in a topic in apache kafka

I am using apache kafka for messaging. I have implemented the producer and consumer in Java. How can we get the number of messages in a topic?

202560 次浏览

从消费者的角度考虑这个问题的唯一方法是实际使用消息,然后对其进行计数。

Kafka 代理公开了自启动以来接收的消息数量的 JMX 计数器,但是您无法知道其中有多少已经被清除。

在大多数常见的场景中,卡夫卡语言中的消息最好被视为一个无限的流,并且得到当前保存在磁盘上的消息数量的离散值是不相关的。此外,当处理一组代理时,事情变得更加复杂,这些代理在一个主题中都有一个消息子集。

I haven't tried 这个 myself, but it seems to make sense.

You can also use kafka.tools.ConsumerOffsetChecker (来源).

它不是爪哇语,但可能是有用的

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list <broker>:<port> \
--topic <topic-name> \
| awk -F  ":" '{sum += $3} END {print sum}'

Use https://prestodb.io/docs/current/connector/kafka-tutorial.html

A super SQL engine, provided by Facebook, that connects on several data sources (Cassandra, Kafka, JMX, Redis ...).

PrestoDB 作为带有可选工作线程的服务器运行(有一个独立模式,没有额外的工作线程) ,然后使用一个小的可执行 JAR (称为 presto CLI)进行查询。

一旦配置好 Presto 服务器,就可以使用传统的 SQL:

SELECT count(*) FROM TOPIC_NAME;

I actually use this for benchmarking my POC. The item you want to use ConsumerOffsetChecker. You can run it using bash script like below.

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

结果如下: enter image description here As you can see on the red box, 999 is the number of message currently in the topic.

Update: ConsumerOffsetChecker is deprecated since 0.10.0, you may want to start using ConsumerGroupCommand.

Apache Kafka 命令在主题的所有分区上获取未处理的消息:

kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group

印刷品:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

第6列是未处理的消息,把它们加起来如下:

kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group 2>/dev/null | awk 'NR>1 {sum += $6}
END {print sum}'

Awk 读取行,跳过标题行,并将第6列相加,最后打印出总和。

指纹

5

要获取为主题存储的所有消息,可以将使用者寻找到每个分区的流的开始和结束,并对结果进行求和

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
.collect(Collectors.toMap(Function.identity(), consumer::position));
consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());

使用卡夫卡2.11-1.0.0的 Java 客户端,你可以做以下事情:

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());


// after each message, query the number of messages of the topic
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for(TopicPartition partition : offsets.keySet()) {
System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
}
}
}

产出是这样的:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13

在最新版本的卡夫卡管理器中,有一个题为 近期抵销总和的专栏。

enter image description here

有时候,人们感兴趣的是了解每个分区中的消息数量,例如,在测试自定义分区程序时。接下来的步骤已经通过了测试,可以与 Confluve3.2中的卡夫卡0.10.2.1-2协同工作。给定一个卡夫卡主题,kt和下面的命令行:

$ kafka-run-class kafka.tools.GetOffsetShell \
--broker-list host01:9092,host02:9092,host02:9092 --topic kt

它打印示例输出,显示三个分区中的消息数:

kt:2:6138
kt:1:6123
kt:0:6137

根据主题的分区数,行的数目可能大致相同。

运行以下命令(假设 kafka-console-consumer.sh在路径上) :

kafka-console-consumer.sh  --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true  \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"

摘自卡夫卡文档

在0.9.0.0中弃用

Kafka-consumer-offset-checker.sh (kafka.tools. consumeroffsetChecker)已经过时,接下来请使用 kafka-consumer-groups.sh (kafka.admin. consumerGroupCommand)实现此功能。

我运行卡夫卡代理与 SSL 都启用了服务器和客户端

Kafka-consumer-groups.sh —— bootstrap-server Broker _ IP: Port —— list —— command-config/tmp/ssl _ config Kafka-consumer-groups.sh —— bootstrap-server Broker _ IP: Port —— command-config/tmp/ssl _ config —— description —— group _ name _ x

where /tmp/ssl_config is as below

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password

如果您可以访问服务器的 JMX 接口,开始和结束偏移位于:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(you need to replace TOPICNAME & PARTITIONNUMBER). 请记住,您需要检查给定分区的每个副本,或者您需要找出哪一个代理是 给予分区的领导者(这可能随着时间的推移而改变)。

或者,您可以使用 卡夫卡消费者方法 beginningOffsetsendOffsets

我发现的最简单的方法是使用 Kafdrop REST API /topic/topicName并指定键: "Accept"/value: "application/json"头,以获得 JSON 响应。

This is documented here.

由于不再支持 ConsumerOffsetChecker,您可以使用此命令检查主题中的所有消息:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
--group my-group \
--bootstrap-server localhost:9092 \
--describe

其中 LAG是主题分区中的消息数:

enter image description here

你也可以尝试使用 卡夫卡卡特。这是一个开源项目,可以帮助您读取来自主题和分区的消息,并将它们打印到 stdout。下面是一个示例,它从 sample-kafka-topic主题中读取最后10条消息,然后退出:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e

我也有同样的问题,我是这样做的,来自 Kotlin 的一位卡夫卡消费者:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
.map {
it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
}.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
.first()

非常粗糙的代码,因为我刚刚得到了这个工作,但基本上你想减去主题的开始偏移量从结束偏移量,这将是该主题的当前消息计数。

由于其他配置(清理策略、保留-ms 等)可能最终导致从主题中删除旧消息,因此不能仅仅依赖于结束偏移量。 偏移量只向前“移动”,因此开始偏移量将向前移动到更接近结束偏移量(或者最终向相同的值移动,如果主题现在不包含任何消息)。

Basically the end offset represents the overall number of messages that went through that topic, and the difference between the two represent the number of messages that the topic contains right now.

如果你需要为一个消费者群体中的所有消费者计算结果(或者为不同的消费者群体) ,另一个选择是使用管理客户端并从主题/分区偏移量中减去消费者群体偏移量,Kotlin 的代码示例:

val topicName = "someTopic"
val groupId = "theGroupId"
val admin = Admin.create(kafkaProps.buildAdminProperties()) // Spring KafkaProperties
val parts = admin.describeTopics(listOf(topicName)).values()[topicName]!!.get().partitions()
val topicPartitionOffsets = admin.listOffsets(parts.associate { TopicPartition(topicName, it.partition()) to OffsetSpec.latest() }).all().get()
val consumerGroupOffsets = admin.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get()
val highWaterMark = topicPartitionOffsets.map { it.value.offset() }.sum()
val consumerPos = consumerGroupOffsets.map { it.value.offset() }.sum()
val unProcessedMessages = highWaterMark - consumerPos

这里还有一个 LeYAUable 示例代码的工作版本,它只使用一个常规(非管理)客户端:

val partitions = consumer.partitionsFor("topicName")
.map { TopicPartition(it.topic(), it.partition()) }
val highWaterMark = consumer.endOffsets(partitions).values.sum()
val consumerPosition = consumer.beginningOffsets(partitions).values.sum()
val msgCount = highWaterMark - consumerPosition

这只会给你这个特定的消费者的补偿!通常需要注意的是,当一个主题被压缩时,这是不精确的。