是否可以在卡夫卡0.8.2中的现有主题中添加分区

我有一个运行着两个分区的 卡夫卡集群。我在想办法把分区数增加到3。但是,我不希望丢失有关该主题的现有消息。我尝试停止 卡夫卡,修改 server.properties文件,将分区数目增加到3,并重新启动卡夫卡。然而,这似乎改变不了什么。使用卡夫卡 ConsumerOffsetChecker,我仍然看到它只使用2个分区。我使用的 卡夫卡版本是0.8.2.2。在0.8.1版本中,曾经有一个名为 kafka-add-partitions.sh的脚本,我想这个脚本可能会起作用。但是,我在0.8.2中没有看到这样的脚本。

  • 有什么办法吗?

我确实尝试过创建一个全新的主题,对于这个主题,它似乎使用了3个分区,每个分区都与 server.properties文件中的变化一致。然而,对于现有的主题,它似乎并不关心。

139070 次浏览

Looks like you can use this script instead:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
--partitions 40

In the code it looks like they do same thing:

 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(topic, partitionReplicaList, zkClient, true)

kafka-topics.sh executes this piece of code as well as AddPartitionsCommand used by kafka-add-partition script.

However you have to be aware of re-partitioning when using key:

Be aware that one use case for partitions is to semantically partition data, and adding partitions doesn't change the partitioning of existing data so this may disturb consumers if they rely on that partition. That is if data is partitioned by hash(key) % number_of_partitions then this partitioning will potentially be shuffled by adding partitions but Kafka will not attempt to automatically redistribute data in any way.

In my case the value zk_host:port/chroot for parameter --zookeeper threw the following exception:

ERROR java.lang.IllegalArgumentException: Topic my_topic_name does not exist on ZK path zk_host:port/chroot.

So, I tried the following and it worked:

 bin/kafka-topics.sh --alter --zookeeper zk_host:port --topic my_topic_name --partitions 10

If you are using Kafka in Windows try this code for alter or add partition in topic

.\bin\windows\kafka-topics.bat --alter --zookeeper localhost:2181 --topic TopicName --partitions 20

or

.\bin\windows\kafka-topics.bat --alter --zookeeper localhost:2181 --topic TopicName --replica-assignment 0:1:2,0:1:2,0:1:2,2:1:0 --partitions 10

For anyone who wants solution for newer Kafka versions.Please follow this method.

Kafka's entire data retention and transfer policy depends on partitions so be careful about effects of increasing partitions. (Kafka's newer versions display warning regarding this) Try to avoid configuration in which one broker has too many leader partitions.

There is simple 3 stage approach to this.

Step 1: Increase the partitions in topics

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic testKafka_5 --partitions 6

Step 2: Create a partitioning json file for given topic

{ "version":1, "partitions":[ {"topic":"testKafka_5","partition":0,"replicas":[0,1,2]}, {"topic":"testKafka_5","partition":1,"replicas":[2,1,0]}, {"topic":"testKafka_5","partition":2,"replicas":[1,2,0]}, {"topic":"testKafka_5","partition":3,"replicas":[0,1,2]}, {"topic":"testKafka_5","partition":4,"replicas":[2,1,0]}, {"topic":"testKafka_5","partition":5,"replicas":[1,2,0]} ]}

Create file with newer partition and replicas. It's better to expand replicas to different brokers but they should be present within same cluster. Take latency into consideration for distant replicas. Transfer the given file to your Kafka.

Step 3: Reassign partitions and verify

./bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file bin/increase-replication-factor.json  --execute


./bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file bin/increase-replication-factor.json --verify

You can check the effects of your change using --describe command.

I think this question is a bit old, but still I will answer.

If you have a Kafka topic but want to change the number of partitions or replicas, you can use a streaming transformation to automatically stream all the messages from the original topic into a new Kafka topic which has the desired number of partitions or replicas.

Code to increase Kafka partition count, in SringBoot using AdminCLient

public void updatePartitionCount(Topic topic,AdminClient adminClient){
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put(topic.getName(), NewPartitions.increaseTo(5));
CreatePartitionsOptions options = new CreatePartitionsOptions();
adminClient.createPartitions(newPartitions);
System.out.println("in partition count update");


}`````

In

kafka_2.13-3.2.0

This worked for me:

/bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic apache_event_log_topic --partitions 4