Is key required as part of sending messages to Kafka?

KeyedMessage<String, byte[]> keyedMessage = new KeyedMessage<String, byte[]>(request.getRequestTopicName(), SerializationUtils.serialize(message));
producer.send(keyedMessage);

Currently, I am sending messages without any key as part of keyed messages, will it still work with delete.retention.ms? Do I need to send a key as part of the message? Is this good to make key as part of the message?

145337 次浏览

如果您需要密钥的强顺序,并且正在开发类似于状态机的东西,那么密钥通常是有用的/必需的。如果您要求具有相同键的消息(例如,一个唯一的 id)总是以正确的顺序出现,那么在消息中附加一个键将确保具有相同键的消息总是到达主题中的相同分区。卡夫卡保证分区内的顺序,但不保证主题中的分区之间的顺序,因此不提供密钥(这将导致分区之间的循环分布)将不会保持这种顺序。

在状态机的情况下,键可以与 Log.cleer.able一起使用,以去除具有相同键的条目的重复。在这种情况下,Kafka 假设您的应用程序只关心给定密钥的最新实例,而日志清理器只有在密钥不为空时才删除给定密钥的旧副本。这种形式的日志压缩由 保留属性控制,并且需要键。

或者,更常见的属性 日志,保留时间,小时(默认情况下启用)通过删除过期日志的完整段来工作。在这种情况下,不必提供密钥。卡夫卡将简单地删除比给定保留期更早的日志块。

这就是所有要说的,如果您已经启用了 原木压实法原木压实法,或者需要对具有相同密钥的消息进行严格的排序,那么您肯定应该使用密钥。否则,空键可以提供更好的分发,并防止在某些键可能比其他键出现得更多的情况下出现潜在的热点问题。

博士 不,给卡夫卡发信息不需要钥匙,但是..。


除了这个非常有用的被接受的答案之外,我还想补充一些更多的细节

分区

默认情况下,Kafka 使用消息的键来选择它写入的主题的分区。这在 DefaultPartitioner中是通过

kafka.common.utils.Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

如果没有提供密钥,那么卡夫卡将以循环方式对数据进行分区。

在卡夫卡中,可以通过扩展 Partitioner类来创建自己的 Partifier。为此,您需要覆盖具有以下签名的 partition方法:

int partition(String topic,
Object key,
byte[] keyBytes,
Object value,
byte[] valueBytes,
Cluster cluster)

通常,卡夫卡消息的 钥匙用于选择分区,返回值(类型为 int)是分区号。如果没有键,则需要依赖于处理起来可能要复杂得多的值。

点菜

正如在给定的答案中所述,卡夫卡保证只在分区级别上对消息进行排序。

假设您希望将客户的金融交易存储在具有两个分区的卡夫卡主题中。消息可能看起来像(key: value)

null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": -1337}
null:{"customerId": 1, "changeInBankAccount": +200}

由于我们没有定义键,所以这两个分区看起来应该是这样的

// partition 0
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}
null:{"customerId": 1, "changeInBankAccount": +200}


// partition 1
null:{"customerId": 2, "changeInBankAccount": +100}
null:{"customerId": 1, "changeInBankAccount": -1337}

你的消费者阅读这个话题可能会告诉你,在一个特定的时间结余是600,虽然从来没有这种情况!仅仅因为它在读取分区1中的消息之前读取分区0中的所有消息。

使用有意义的键(如 customerId)可以避免这种情况,因为分区应该是这样的:

// partition 0
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": +200}
1:{"customerId": 1, "changeInBankAccount": -1337}
1:{"customerId": 1, "changeInBankAccount": +200}


// partition 1
2:{"customerId": 2, "changeInBankAccount": +100}

请记住,只有将生产者配置 max.in.flight.requests.per.connection设置为 1,才能保证分区内的排序。但是,该配置的默认值是 5,它被描述为:

”在阻塞之前,客户端在单个连接上发送的未确认请求的最大数量。请注意,如果将此设置设置为大于1,并且发送失败,则由于重试,存在消息重新排序的风险(即,如果启用了重试)。”

你可以在 Stackoverflow 的另一篇文章中找到更多的细节。

原木压实

如果没有密钥作为消息的一部分,您将无法将主题配置 cleanup.policy设置为 compacted。根据 文件的“日志压缩确保卡夫卡将始终保留至少最后一个已知的值,每个消息键在日志中的数据为一个单一的主题分区。”.

如果没有任何键,这个漂亮而有用的设置将不可用。

键的用法

在现实生活中的用例中,卡夫卡消息的关键字可能会对您的业务逻辑的性能和清晰度产生巨大的影响。

例如,可以自然地使用键来对数据进行分区。由于您可以控制使用者从特定的分区读取,因此这可以作为一个有效的过滤器。此外,该键还可以包含关于消息实际值的一些元数据,这些数据有助于您控制后续处理。键通常比值小,因此解析键比解析整个值更方便。同时,您可以应用所有序列化和模式注册,就像您的值和键一样。

值得注意的是,还有 的概念,可以用来存储信息,参见 文件

带有消息的键基本上被发送来获取特定字段的消息顺序。

  • 如果 key = null,则数据循环发送(发送到分布式 env 中的不同分区和不同代理)。当然还有同一个话题。).
  • 如果发送了一个密钥,那么该密钥的所有消息将总是到同一个分区。

解释和举例

  • Key 可以是任何字符串或整数,等等。
  • 因此 Employee _ id 123将总是转到分区0,employee _ id 345将总是转到分区1。这是由密钥哈希算法决定的,这取决于分区的数量。
  • 如果您不发送任何密钥,那么消息可以使用循环技术进入任何分区。