Samza在发送消息时是否自动创建分区? [英] Does Samza create partitions automatically when sending messages?

查看:187
本文介绍了Samza在发送消息时是否自动创建分区?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果您使用Samza的 OutgoingMessageEnvelope 使用以下格式发送消息:

  public OutgoingMessageEnvelope(SystemStream systemStream,
java.lang .Object partitionKey,
java.lang.Object key,
java.lang.Object message)
从指定组件构造一个新的OutgoingMessageEnvelope。
参数:
systemStream - 表示此信封将被发送到的适当流的对象。
partitionKey - 表示发送此信封的systemStream的哪个分区的密钥。
key - 用于消息的反序列化密钥。
消息 - 将在此信封中发送的反序列化消息。

,您可以在流任务的process()方法中调用此方法,并将传入消息路由到一个合适的分区,Samza会在你调用这个方法时为你创建分区吗?



例如

  MessageA = {id:idA,key:keyA,body:一些细节} 
MessageB = {id:idB, key:keyB,body:一些更多细节}

在流任务的 process()中,其中 msg 是一个消息实例:

  public void process(IncomingMessageEnvelope envelope,MessageCollector collector,TaskCoordinator coordinator){
// ...
String partition = msg [id]
String key = msg [key]
collector.send(新的OutgoingMessageEnvelope(新的SystemStream(kafka,PartitionedMessages),id,key,msg));
// ...

这会自动为我创建分区idA和idB在我向他们发送消息之前,是否需要创建这些分区)?我希望能够将消息路由到适当的分区,并且能够使用单独的消息密钥记录压缩。

解决方案

创建主题时,您必须指定分区数量。你不能动态添加新的分区(当然,你可以可以但它并不容易,Samza不会自动执行)。如果主题不存在,但默认分区数量,Samza应为您创建新主题。这取决于设置。你可以测试它。

但值 msg [id] 不指定分区的名称。该值仅用于计算目标分区的数量。该值被散列为一个数字,然后使用模数进行修整。像这样的东西(有更多的算法,这是基本的算法):

  partitionID = hash(msg [id] )%total_number_of_partitions 

partitionID 总是一个非负整数。这意味着它实际上有多少个分区并不重要。它总是在一些结束。主要思想是,如果你有两个消息具有相同的 msg [id] ,那么这些消息将会在相同的分区中结束。这通常是你想要的。

日志压缩将按照您预期的方式工作 - 它将从特定分区中删除具有相同密钥的消息(但是如果您有两个消息具有两个相同的密钥不同的分区,他们不会被删除)。

仅供参考,您可以使用 kafkacat 查明分区数量和其他有用的东西。

If you use Samza's OutgoingMessageEnvelope to send a message using this format:

public OutgoingMessageEnvelope(SystemStream systemStream,
                               java.lang.Object partitionKey,
                               java.lang.Object key,
                               java.lang.Object message)
Constructs a new OutgoingMessageEnvelope from specified components.
Parameters:
systemStream - Object representing the appropriate stream of which this envelope will be sent on.
partitionKey - A key representing which partition of the systemStream to send this envelope on.
key - A deserialized key to be used for the message.
message - A deserialized message to be sent in this envelope.

and you call this method within a stream task's process() method and want to route the incoming messages to an appropriate partition, will Samza create the partitions for you when you call the method?

E.g.

MessageA = {"id": "idA", "key": "keyA", "body":"some details"}
MessageB = {"id": "idB", "key": "keyB", "body":"some more details"}

If I call within a stream task's process() where msg is a message instance:

public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
    // ...
    String partition = msg["id"]
    String key = msg["key"]
    collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "PartitionedMessages"), id, key, msg));
    // ...

Will this create partitions idA and idB automatically for me (i.e. do I need to have created these partitions before I send message to them)? I want to be able to route a message to an appropriate partition and also to be able to log compaction with a separate message key.

解决方案

You must specify number of partition when you create the topic. You cannot dynamically add new partitions (well, you can but it is not easy and Samza does not do it automatically). Samza should create new topic for you if the topic doesn't exists but with default number of partitions. It depends on settings. You can test it.

But the value msg["id"] does not specify the name of the partition. This value is just used to compute the number of targeted partition. This value is hashed to a number and then trimmed using modulo. Something like this (there are more algorithms, this is the basic one):

partitionID = hash(msg["id"]) % total_number_of_partitions

And partitionID is always a non-negative integer. It means that it doesn't matter how many partitions do you actually have. It always end up in some. The main idea is that if you have two messages with the same msg["id"], then the messages will end up in the same partitions. That is usually what you want.

The log compaction will work as you probably expected -- it will remove messages with same key from the specific partition (but if you have two messages with the same key with two different partitions, they will not be removed).

FYI you can use kafkacat to find out the number of partitions and other useful stuff.

这篇关于Samza在发送消息时是否自动创建分区?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆