在卡夫卡的作品中,有没有一种方法可以清除这个话题?

我把一个太大的消息推到我本地机器的kafka消息主题中,现在我得到一个错误:

kafka.common.InvalidMessageSizeException: invalid message size

增加fetch.size在这里并不理想,因为我实际上不想接受那么大的消息。

288514 次浏览

更新:这个答案与Kafka 0.6相关。对于Kafka 0.8和以后参见@Patrick的回答。

是的,停止kafka,手动删除相应子目录下的所有文件(在kafka数据目录下很容易找到)。kafka重启后,主题将为空。

下面是删除名为MyTopic的主题的步骤:

  1. 描述主题,并记下代理id
  2. 为列出的每个代理ID停止Apache Kafka守护进程。
  3. 连接到每个代理(从步骤1开始),并删除主题数据文件夹,例如rm -rf /tmp/kafka-logs/MyTopic-0。对其他分区和所有副本重复此操作
  4. 删除主题元数据:zkCli.sh,然后rmr /brokers/MyTopic
  5. 为每台停止的机器启动Apache Kafka守护进程

如果你错过了第3步,那么Apache Kafka将继续报告当前的主题(例如当你运行kafka-list-topic.sh时)。

使用Apache Kafka 0.8.0测试。

Thomas的建议很好,但不幸的是,旧版本的Zookeeper(例如3.3.6)中的zkCli似乎不支持rmr。例如,比较现代的动物园管理员版本3.3中的命令行实现。

如果你面对的是旧版本的Zookeeper,一个解决方案是使用一个客户端库,比如Python的zc.zk。对于不熟悉Python的人,您需要使用皮普easy_install安装它。然后启动Python shell (python),你可以:

import zc.zk
zk = zc.zk.ZooKeeper('localhost:2181')
zk.delete_recursive('brokers/MyTopic')

甚至

zk.delete_recursive('brokers')

如果你想从卡夫卡中删除所有的主题。

最简单的方法是将各个日志文件的日期设置为比保留期更早的日期。然后经纪人会在几秒钟内为你清理并移除它们。这有几个优点:

  1. 不需要关闭代理,这是一个运行时操作。
  2. 避免出现无效偏移异常的可能性(下文将详细介绍)。

根据我使用卡夫卡0.7的经验。X,删除日志文件并重新启动代理可能会导致某些消费者出现无效偏移异常。这是因为代理将在零处重新启动偏移量(在没有任何现有日志文件的情况下),并且先前从主题消费的消费者将重新连接以请求特定的[曾经有效的]偏移量。如果这个偏移量刚好超出了新主题日志的范围,那么不会造成任何损害,使用者可以在开始或结束时重新开始。但是,如果偏移量落在新主题日志的范围内,代理将尝试获取消息集,但会失败,因为偏移量与实际消息不一致。

还可以通过清除zookeeper中针对该主题的消费者偏移量来缓解这一问题。但如果你不需要一个处女主题,只是想删除现有的内容,那么简单地“触摸”一些主题日志要比停止代理、删除主题日志和清除某些zookeeper节点容易得多,也更可靠。

要清除队列,可以删除主题:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创造它:

bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 --topic test

使用应用程序组(GroupName应该与应用程序kafka组名相同)清理来自特定主题的所有消息。

./kafka-path/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicName --from-beginning --group application-group

临时更新主题的保留时间为1秒:

kafka-topics.sh \
--zookeeper <zkhost>:2181 \
--alter \
--topic <topic name> \
--config retention.ms=1000

在更新的Kafka版本中,你也可以用kafka-configs --entity-type topics来实现

kafka-configs.sh \
--zookeeper <zkhost>:2181 \
--entity-type topics \
--alter \
--entity-name <topic name> \
--add-config retention.ms=1000

然后等待清除生效(持续时间取决于主题的大小)。一旦清除,恢复之前的retention.ms值。

在Kafka 0.8.2中测试,用于快速启动示例: 首先,添加一行到服务器。配置文件夹下的属性文件:

. Properties
delete.topic.enable=true

然后,执行以下命令:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

然后重新创建它,以便客户端继续对空主题进行操作

虽然公认的答案是正确的,但该方法已被弃用。主题配置现在应该通过kafka-configs来完成。

kafka-configs --zookeeper localhost:2181 --entity-type topics --alter --add-config retention.ms=1000 --entity-name MyTopic

通过该方法设置的配置可以通过命令显示

kafka-configs --zookeeper localhost:2181 --entity-type topics --describe --entity-name MyTopic

除了更新用户留存。Ms和保留率。字节,我注意到主题清理策略应该是“删除”;(默认值),如果“;compact"”,它将更长时间地保留消息,也就是说,如果它是“;compact"”,你还必须指定delete.retention.ms

./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics Configs for topics:test-topic-3-100 are retention.ms=1000,delete.retention.ms=10000,cleanup.policy=delete,retention.bytes=1

还得监控最早/最新的偏移量,以确认这一成功发生,也可以检查du -h /tmp/kafka-logs/test-topic-3-100-*

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -1 | awk -F ":" '{sum += $3} END {print sum}' 26599762

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list "BROKER:9095" --topic test-topic-3-100 --time -2 | awk -F ":" '{sum += $3} END {print sum}' 26599762

另一个问题是,你必须获得当前的配置第一个,这样你就会记得在删除成功后恢复: ./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name test-topic-3-100 --entity-type topics < / p >

kafka没有清除/清理主题(队列)的直接方法,但可以通过删除该主题并重新创建它来实现。

首先,确保服务器。属性文件有,如果没有,则添加delete.topic.enable=true

then,删除主题 bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic myTopic < / p >

然后重新创建它。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic myTopic --partitions 10 --replication-factor 2

有时,如果您有一个饱和的集群(分区太多,或使用加密的主题数据,或使用SSL,或控制器在一个坏的节点上,或连接不稳定),清除该主题将花费很长时间。

我遵循这些步骤,特别是如果你使用Avro。

1:使用kafka工具运行:

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=1 --entity-name <topic-name>

2:运行:

kafka-console-consumer --consumer-property security.protocol=SSL --consumer-property ssl.truststore.location=/etc/schema-registry/secrets/trust.jks --consumer-property ssl.truststore.password=password --consumer-property ssl.keystore.location=/etc/schema-registry/secrets/identity.jks --consumer-property ssl.keystore.password=password --consumer-property ssl.key.password=password --bootstrap-server broker01.kafka.com:9092 --topic <topic-name> --new-consumer --from-beginning

3:当主题为空时,将主题保留设置回初始设置。

kafka-configs.sh --alter --entity-type topics --zookeeper zookeeper01.kafka.com --add-config retention.ms=604800000 --entity-name <topic-name>

希望这能帮助到一些人,因为它不容易宣传。

另一种手动清除主题的方法是:

在经纪人中:

  1. stop kafka broker
    sudo service kafka stop < / > <李> 删除所有分区日志文件(应该在所有代理上执行) < br > 李sudo rm -R /kafka-storage/kafka-logs/<some_topic_name>-* < / >

动物园管理员:

  1. run zookeeper命令行接口
    .
    sudo /usr/lib/zookeeper/bin/zkCli.sh < / >
  2. 使用zkCli删除主题元数据
    rmr /brokers/topic/<some_topic_name> < / >

在经纪人那里:

  1. 重启代理服务
    sudo service kafka start < / >

来自kafka 1.1

清除一个主题

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --add-config retention.ms=100

至少等待1分钟,以确保kafka清除主题 删除配置,然后使用默认值

bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name tp_binance_kline --delete-config retention.ms
./kafka-topics.sh --describe --zookeeper zkHost:2181 --topic myTopic

这应该给retention.ms配置。然后您可以使用上面的alter命令将其更改为1秒(稍后恢复为默认值)。

Topic:myTopic   PartitionCount:6        ReplicationFactor:1     Configs:retention.ms=86400000

在Java中,使用新的AdminZkClient代替已弃用的AdminUtils:

  public void reset() {
try (KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", false, 200_000,
5000, 10, Time.SYSTEM, "metricGroup", "metricType")) {


for (Map.Entry<String, List<PartitionInfo>> entry : listTopics().entrySet()) {
deleteTopic(entry.getKey(), zkClient);
}
}
}


private void deleteTopic(String topic, KafkaZkClient zkClient) {


// skip Kafka internal topic
if (topic.startsWith("__")) {
return;
}


System.out.println("Resetting Topic: " + topic);
AdminZkClient adminZkClient = new AdminZkClient(zkClient);
adminZkClient.deleteTopic(topic);


// deletions are not instantaneous
boolean success = false;
int maxMs = 5_000;
while (maxMs > 0 && !success) {
try {
maxMs -= 100;
adminZkClient.createTopic(topic, 1, 1, new Properties(), null);
success = true;
} catch (TopicExistsException ignored) {
}
}


if (!success) {
Assert.fail("failed to create " + topic);
}
}


private Map<String, List<PartitionInfo>> listTopics() {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
props.put("group.id", "test-container-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");


KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
consumer.close();


return topics;
}

根据@steven appleyard的回答,我在Kafka 2.2.0上执行了以下命令,它们对我有用。

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --describe


bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --add-config retention.ms=1000


bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name <topic-name> --alter --delete-config retention.ms

这里有很多很棒的答案,但在其中,我没有找到一个关于docker的答案。我花了一些时间来弄清楚在这种情况下使用代理容器是错误的(显然!!)

## this is wrong!
docker exec broker1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
Exception in thread "main" kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING
at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:258)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253)
at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:254)
at kafka.zookeeper.ZooKeeperClient.<init>(ZooKeeperClient.scala:112)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1826)
at kafka.admin.TopicCommand$ZookeeperTopicService$.apply(TopicCommand.scala:280)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

我应该使用zookeeper:2181而不是--zookeeper localhost:2181作为我的撰写文件

## this might be an option, but as per comment below not all zookeeper images can have this script included
docker exec zookeper1 kafka-topics --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000

正确的命令应该是

docker exec broker1 kafka-configs --zookeeper zookeeper:2181 --alter --entity-type topics --entity-name dev_gdn_urls --add-config retention.ms=12800000

希望能节省一些人的时间。

另外,请注意消息不会立即删除,而是在关闭日志段时删除。

下面的命令可以删除kafka topic中所有已有的消息:

kafka-delete-records --bootstrap-server <kafka_server:port> --offset-json-file delete.json

删除的结构。Json文件应该如下:

< p > { “partitions":( { “topic":“foo" “partition": 1、 “offset": 1 } ), “version": 1 } < / p >

where offset:-1将删除所有记录 这个命令已经在kafka 2.0.1

中测试过了

你是否考虑过让你的应用只使用一个新的重命名主题?(例如,一个主题的名称与原来的主题一样,但有一个&;1"附在末尾)。

这也会给你的应用一个新鲜干净的主题。

如果你想在Java应用程序中以编程方式做到这一点,你可以使用AdminClient的API deleteRecords。使用AdminClient可以删除分区和偏移量级别上的记录。

根据JavaDocs,此操作由0.11.0.0或更高版本的代理支持。

这里有一个简单的例子:

String brokers = "localhost:9092";
String topicName = "test";
TopicPartition topicPartition = new TopicPartition(topicName, 0);
RecordsToDelete recordsToDelete = RecordsToDelete.beforeOffset(5L);


Map<TopicPartition, RecordsToDelete> topicPartitionRecordToDelete = new HashMap<>();
topicPartitionRecordToDelete.put(topicPartition, recordsToDelete);


// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);


try {
adminClient.deleteRecords(topicPartitionRecordToDelete).all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
adminClient.close();
}

如果你正在使用confluentinc/cp-kafka容器,这里是删除主题的命令。

docker exec -it <kafka-container-id> kafka-topics --zookeeper zookeeper:2181 --delete --topic <topic-name>

成功的回应:

Topic <topic-name> is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# you have to enable this on config
sudo echo "delete.topic.enable=true" >> /opt/kafka/config/server.properties
sudo systemctl stop kafka
sudo systemctl start kafka
# purge the topic
/opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic flows


# create the topic
# /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:2181 --replication-factor 1 --partitions 1 --topic Test
# list the topic
# /opt/kafka/bin/kafka-console-consumer.sh  localhost:9092 --topic flows --from-beginning

user644265在此回答中建议的临时减少主题保留时间的变通方法仍然有效,但最近版本的kafka-configs将警告--zookeeper选项已弃用:

警告:——zookeeper已弃用,并将在Kafka的未来版本中删除

使用--bootstrap-server代替;例如

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --add-config retention.ms=100

而且

kafka-configs --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name my_topic --delete-config retention.ms

我使用的是Kafka 2.13工具。现在——zookeeper是kafka-topics.sh的不可识别选项。删除主题。

bin/kafka-topics.sh --bootstrap-server [kafka broker]:9092 --delete --topic [topic name]

只是要考虑到,如果你在删除的主题中有很多数据,再次创建相同的主题可能需要一段时间。当你尝试创建相同的主题时,你可能会得到这样的错误:

ERROR org.apache.kafka.common.errors.TopicExistsException: Topic

. '[topic name]'标记为删除

以防有人正在寻找一个更新的答案(在2022年),我发现以下将适用于Kafka 3.3.1版本。这将改变“your-topic”的配置;因此,消息将保留1000毫秒。在清除消息之后,可以将其设置为不同的值。

bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name your-topic  --alter --add-config retention.ms=1000