如何将两个不同Spout的输出发送到同一个Bolt? [英] How to send output of two different Spout to the same Bolt?

查看:101
本文介绍了如何将两个不同Spout的输出发送到同一个Bolt?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个Kafka Spouts,我希望将其发送到相同的螺栓。

I have two Kafka Spouts whose values I want to send to the same bolt.

是否可以?

推荐答案

是的可能:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");

您也可以使用任何其他分组。

You can use any other grouping, too.

更新:

为了区分消费者螺栓中的元组(即topic_1或topic_2),有两种可能:

In order to distinguish tuples (ie, topic_1 or topic_2) in consumer bolt, there are two possibilities:

1)您可以使用运营商ID(由@ user-4870385建议):

1) You can use operator IDs (as suggested by @user-4870385):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
    //do something
} else {
    //do something
}

2)您可以使用流名称(由@zenbeni建议)。对于这种情况,两个spouts都需要声明命名流,并且bolt需要通过流名称连接到spouts:

2) You can use stream names (as suggested by @zenbeni). For this case, both spouts need to declare named streams and the bolt need to connect to spouts by stream names:

public class MyKafkaSpout extends KafkaSpout {
  final String streamName;

  public MyKafkaSpout(String stream) {
    this.streamName = stream;
  }

  // other stuff omitted

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // compare KafkaSpout.declareOutputFields(...)
    declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
  }
}

构建拓扑,需要立即使用流名称:

Build the topology, stream names need to be used now:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");

MyBolt 中,流名称现在可以是用于区分输入元组:

In MyBolt the stream name can now be used to distinguish input tuples:

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
  // do something
} else {
  // do something
}

讨论:

使用流名称的方法更自然(根据@zenbeni), first 更灵活(IHMO)。流名称直接由spout / bolt声明(即,在写入spout / bolt代码时);相反,当拓扑结合在一起时(即,在使用时),分配了操作员ID。

While the second approach using stream names is more natural (according to @zenbeni), the first is more flexible (IHMO). Stream names are declared by spout/bolt directly (ie, at the time the spout/bolt code is written); in contrast, operator IDs are assigned when topology is put together (ie, at the time the spout/bolt is used).

让我们假设我们得到三个螺栓作为类文件(没有源代码)。前两个应该用作生成器,并且都声明具有相同名称的输出流。如果第三个消费者通过流来区分输入元组,则这将不起作用。即使两个给定的生成器螺栓都声明了不同的输出流名称,预期的输入流名称也可能在消费者螺栓中进行硬编码,并且可能不匹配。因此,它也不起作用。但是,如果消费者使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配预期的组件ID。

Let's assume we get three bolts as class files (no source code). The first two should be used as producers and both declare output streams with the same name. If the third consumer distinguishes input tuples by stream, this will not work. Even if both given producer bolts declare different output stream names, the expected input stream names might be hard coded in consumer bolt and might not match. Thus, it does not work either. However, if the consumer bolt uses component names (even if they are hard coded) to distinguish incoming tuples, the expected component IDs can be assigned correctly.

当然,它可以从给定的类继承(如果没有声明 final 并覆盖 declareOutputFields(...)分配自己的流名称。但是,这还有更多工作要做。

Of course, it would be possible to inherit from the given classes (if not declared final and overwrite declareOutputFields(...) in order to assign own stream names. However, this is more additional work to do.

这篇关于如何将两个不同Spout的输出发送到同一个Bolt?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆