创建一个新的Jet自定义分区程序 [英] Creating a new Jet custom Partitioner

查看:124
本文介绍了创建一个新的Jet自定义分区程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的用例要求从Kafka主题中读取消息,并按照发布到Kafka中的自然顺序处理消息.

My use case requires to read messages from a Kafka topics and process the messages in the natural order as they were published into the Kafka.

Kafka生产者负责发布按单个kafka主题分区排序的每组消息,而我需要以相同的顺序在同一Vertex-Processor中处理每组消息.

The Kafka producer is responsible to publish each group of messages sorted in a single kafka topic-partition, and I need to process each group of message in the same Vertex-Processor in the same order.

上面的图像代表了基本思想.有一些从Kafka读取的KafkaSource-Processors.

The image above represents the basic idea. There a few KafkaSource-Processors reading from Kafka.

并且一条边连接到顶点以解码kafka消息等.

And one edge connected to a vertex to decode the kafka message and so on.

我可以使用kafka消息密钥作为分区密钥,但我认为最终将使用不平衡的解码处理器.

I could use the kafka message key as the partitioning key, but I think that I will end up with unbalanced decode processor.

鉴于:

  • 如何创建新的分区程序?我找不到任何启发我的例子.
  • 在新的分区程序上,如何识别发出该消息的KS处理器?我想在上一个顶点处理和下一个顶点处理器之间具有一对一的关系,例如,KS#0总是将消息发送到Decode#0,KS#1到Decode#1,依此类推. /li>
  • 我是否需要一个新的分区程序,或者是否有一些现成的功能来实现这一目标?
  • How can I create a new Partitioner ? I couldn't find any example to inspire me.
  • On the new Partitioner, how can I identify KS processor that emitted the message ? I would like to have a 1-to-1 relationship between previous vertex process and the next vertex processor, for instance, KS#0 always send the messages to the Decode#0, KS#1 to Decode#1 and so on.
  • Do I need a new partitioner for that or is there some out-of-the-box functionality to achieve that ?

推荐答案

您无需为此使用分区程序. Edge.isolated()相等的本地并行度共同用于以下目的:

You don't need to use partitioner for this. Edge.isolated() together with equal local parallelism is designed for this:

dag.edge(between(kafkaSource, decode).isolated());

在这种情况下,源处理器的一个实例与目标处理器的一个实例完全绑定,并且将保留项目的顺序.请记住,单个Kafka源处理器可以从多个Kafka分区中提取物品,因此您必须跟踪Kafka分区ID.即使您使Jet处理器和Kafka分区的总数相等,也不能依靠它,因为如果其中一个成员失败并且作业重新启动,Jet处理器的总数将减少,但是赢得了Kafka分区的数量

In this case, one instance of source processor is bound with exactly one instance of target processor and ordering of items will be preserved. Keep in mind that single Kafka source processor can take items from more than one Kafka partition, so you have to track the Kafka partition id. Even if you make the total number of Jet processors and Kafka partitions equal, you can't rely on it, because if one of the members fails and the job is restarted, the total number Jet processors will decrease but the number of Kafka partitions won't.

还请注意,源的默认本地并行度并不相等:对于Kafka源,其默认值为2,对于其他源,它通常等于CPU数量.您需要手动指定相等的值.

Also note that default local parallelism is not equal for sources: For Kafka source it defaults to 2, for others it typically is equal to CPU count. You need to manually specify equal value.

另一个限制是,如果将Processors.mapP用于decode顶点,则映射函数必须是无状态的.因为您需要订购这些物品,所以我假设您需要保留一些状态.为了使其正常工作,您必须使用自定义处理器:

Another limitation is if you use Processors.mapP for your decode vertex, the mapping function must be stateless. Because you need the items to be ordered I assume that you have some state to keep. For it to work correctly, you have to use custom processor:

Vertex decode = dag.newVertex("decode", MyDecodeP::new);

处理器实现:

private static class MyDecodeP extends AbstractProcessor {
    private Object myStateObject;

    @Override
    protected boolean tryProcess(int ordinal, @Nonnull Object item) {
        Object mappedItem = ...;
        return tryEmit(mappedItem);
    }
}

答案是为Jet 0.5.1.写的.

The answer was written for Jet 0.5.1.

这篇关于创建一个新的Jet自定义分区程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆