如何用卡夫卡(超过15MB)发送大邮件?

我使用 Java Producer API 向 Kafka V. 0.8发送字符串消息。 如果消息大小约为15MB,我得到一个 MessageSizeTooLargeException。 我已经尝试将 message.max.bytes设置为40MB,但仍然出现异常。

(异常出现在生产者中,我在这个应用程序中没有使用者。)

我要怎么做才能消除这个异常?

我的示例生成器配置

private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}

错误日志:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]


kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
236943 次浏览

需要记住的一点是,message.max.bytes属性必须是具有使用者 fetch.message.max.bytes属性的 同步。提取大小必须至少与最大消息大小一样大,否则可能会出现生产者发送的消息大于消费者使用/提取的消息的情况。也许值得一看。
你用的是哪个版本的卡夫卡?还提供了一些更详细的跟踪,您正在得到。有没有这样的东西... < code > payload size of xxxx large 超过1000000 在日志中出现?

您需要调整三个(或四个)属性:

  • 消费者端: fetch.message.max.bytes-这将决定消费者可以获取的消息的最大大小。
  • 代理端: replica.fetch.max.bytes-这将允许代理中的副本在集群中发送消息,并确保正确地复制消息。如果这太小,那么消息将永远不会被复制,因此,使用者将永远不会看到消息,因为消息将永远不会被提交(完全复制)。
  • 代理端: message.max.bytes-这是代理可以从生产者那里接收到的消息的最大大小。
  • 代理端(每个主题) : max.message.bytes-这是代理允许附加到主题的最大消息大小。预压缩验证了此大小。(默认为经纪人的 message.max.bytes。)

我发现了关于第二条的困难之处——你不会从卡夫卡那里得到任何异常、信息或警告,所以当你发送大型信息时一定要考虑到这一点。

您需要重写以下属性:

Broker Configs ($KAFKA _ HOME/config/server.properties)

  • Plica.fetch.max.byte
  • Message.max.byte

用户配置($KAFKA _ HOME/config/Consumer.properties)
这个步骤对我不起作用,我把它添加到消费者应用程序中,它工作得很好

  • Metch.message.max.byte

重启服务器。

更多信息请参考以下文档: Http://kafka.apache.org/08/configuration.html

这个想法是有同样大小的信息被发送从卡夫卡生产者卡夫卡经纪人,然后接收卡夫卡消费者即。

卡夫卡生产者—— > 卡夫卡经纪人—— > 卡夫卡消费者

假设需求是发送15MB 的消息,那么 制片人布洛克消费者这三者都需要同步。

Kafka Producer 发送15MB --> 卡夫卡经纪人允许/存储15MB --> 卡夫卡消费者接收15MB

因此,环境应该是:

A)经纪:

message.max.bytes=15728640
replica.fetch.max.bytes=15728640

B)关于消费者:

fetch.message.max.bytes=15728640

笑人的回答相比,卡夫卡0.10新消费者需要做出一些小的改变:

  • 经纪人: 没有变化,你仍然需要增加属性 message.max.bytesreplica.fetch.max.bytesmessage.max.bytes必须等于或小于 replica.fetch.max.bytes(*)。
  • 增加 max.request.size以发送更大的消息。
  • 使用者: 增加 max.partition.fetch.bytes以接收更大的消息。

(*)阅读评论,以了解更多关于 message.max.bytes < = replica.fetch.max.bytes

“笑的人”给出的答案相当准确。但是,我还是想给出一个我从 Kafka 专家 abc0那里学到的建议。我们积极地将这个解决方案应用到我们的实时系统中。

卡夫卡不是用来处理大型信息的。

您的 API 应该使用云存储(例如,AWS S3) ,并简单地将对 S3的引用推送到 Kafka 或任何其他消息代理。您需要找到一个地方来保存数据,无论是网络驱动器还是其他完全不同的东西,但是它不应该是消息代理。

如果您不想继续使用上面推荐的可靠解决方案,

消息的最大大小为1MB (代理中的设置称为 message.max.bytes) 阿帕奇 · 卡夫卡。如果您真的非常需要它,那么您可以增加它的大小,并确保为生产者和消费者增加网络缓冲区。

如果你真的关心如何分割你的消息,确保每个消息分割有完全相同的关键字,以便它被推到相同的分区,你的消息内容应该报告一个“ part id”,以便你的消费者可以完全重建消息。

如果消息是基于文本的,则尝试压缩数据,这可能会减少数据大小,但不会神奇地减少。

同样,你必须使用一个外部系统来存储这些数据,只需推送一个外部引用到卡夫卡。这是一种非常常见的架构,您应该使用它,并且应该得到广泛的接受。

请记住,卡夫卡只有在信息数量巨大而不是大小的情况下才能发挥最佳作用。

资料来源: https://www.quora.com/How-do-I-send-Large-messages-80-MB-in-Kafka

对于使用 landoop kafka 的人: 您可以在环境变量中传递如下配置值:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
-e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

这将在代理上设置 topic.max.message.bytesreplica.fetch.max.bytes

如果你使用的是 rdkafka,那么在生成器配置中传递 message.max.bytes,例如:

  const producer = new Kafka.Producer({
'metadata.broker.list': 'localhost:9092',
'message.max.bytes': '15728640',
'dr_cb': true
});

同样,对于消费者来说,

  const kafkaConf = {
"group.id": "librd-test",
"fetch.message.max.bytes":"15728640",
... .. }

我认为,这里的大多数答案都有点过时或者不完整。

为了参考 Sacha Vetter 的回答(包括对 卡夫卡0.10的更新) ,我想提供一些额外的信息和指向官方文档的链接。


生产者配置:

代理/主题配置:

由于我可以作为 Kafka 集群的客户端自己配置主题(例如使用 管理员客户端) ,所以我总是更喜欢主题受限的配置。我可能对经纪人配置本身没有任何影响。


在上面的答案中,还提到了一些必要的配置:

来自文档: 这不是绝对最大值,如果提取的第一个非空分区中的第一个记录批大于这个值,仍然会返回记录批以确保进度

来自文档: “消费者会分批获取记录。如果提取的第一个非空分区中的第一个记录批量大于这个限制,批量仍将被返回,以确保消费者能够取得进展。”

来自文档: 消费者分批获取记录,如果获取的第一个非空分区中的第一个记录批量大于这个值,记录批量仍将被返回,以确保消费者能够取得进展


结论: 为了处理消息,不需要更改关于获取消息的配置,这些配置大于这些配置的默认值(在小型设置中进行了测试)。也许,消费者可能总是得到批量的尺寸1。但是,必须设置第一个块中的两个配置,如前面的答案所述。

这个说明不应该说明任何关于性能的内容,也不应该建议设置或不设置这些配置。最佳值必须根据具体的计划吞吐量和数据结构分别进行评估。

下面是我如何使用 kafka-python==2.0.2成功地发送高达100mb 的数据:

经纪人:

consumer = KafkaConsumer(
...
max_partition_fetch_bytes=max_bytes,
fetch_max_bytes=max_bytes,
)

生产者(见最后的最终解决方案) :

producer = KafkaProducer(
...
max_request_size=KafkaSettings.MAX_BYTES,
)

然后:

producer.send(topic, value=data).get()

在发送这样的数据之后,出现了以下例外情况:

MessageSizeTooLargeError: The message is n bytes when serialized which is larger than the total memory buffer you have configured with the buffer_memory configuration.

最后,我增加了 buffer_memory(默认为32mb)来接收另一端的消息。

producer = KafkaProducer(
...
max_request_size=KafkaSettings.MAX_BYTES,
buffer_memory=KafkaSettings.MAX_BYTES * 3,
)