如何将两个不同 Spout 的输出发送到同一个 Bolt? [英] How to send output of two different Spout to the same Bolt?

查看:25
本文介绍了如何将两个不同 Spout 的输出发送到同一个 Bolt?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 Kafka Spout,我想将它们的值发送到同一个 Bolt.

I have two Kafka Spouts whose values I want to send to the same bolt.

有可能吗?

推荐答案

是的,这是可能的:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new KafkaSpout(...));
b.setSpout("topic_2", new KafkaSpout(...));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1").shuffleGrouping("topic_2");

您也可以使用任何其他分组.

You can use any other grouping, too.

更新:

为了区分consumer bolt中的tuples(即topic_1或topic_2),有两种可能:

In order to distinguish tuples (ie, topic_1 or topic_2) in consumer bolt, there are two possibilities:

1) 您可以使用操作员 ID(如@user-4870385 所建议):

1) You can use operator IDs (as suggested by @user-4870385):

if(input.getSourceComponent().equalsIgnoreCase("topic_1")) {
    //do something
} else {
    //do something
}

2) 您可以使用流名称(如@zenbeni 所建议).对于这种情况,两个 spout 都需要声明命名流,并且 bolt 需要通过流名称连接到 spout:

2) You can use stream names (as suggested by @zenbeni). For this case, both spouts need to declare named streams and the bolt need to connect to spouts by stream names:

public class MyKafkaSpout extends KafkaSpout {
  final String streamName;

  public MyKafkaSpout(String stream) {
    this.streamName = stream;
  }

  // other stuff omitted

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // compare KafkaSpout.declareOutputFields(...)
    declarer.declare(streamName, _spoutConfig.scheme.getOutputFields());
  }
}

构建拓扑,现在需要使用流名称:

Build the topology, stream names need to be used now:

TopologyBuilder b = new TopologyBuilder();
b.setSpout("topic_1", new MyKafkaSpout("stream_t1"));
b.setSpout("topic_2", new MyKafkaSpout("stream_t2"));
b.setBolt("bolt", new MyBolt(...)).shuffleGrouping("topic_1", "stream_t1").shuffleGrouping("topic_2", "stream_t2");

MyBolt 中,流名称现在可用于区分输入元组:

In MyBolt the stream name can now be used to distinguish input tuples:

// in my MyBolt.execute():
if(input.getSourceStreamId().equals("Topic1")) {
  // do something
} else {
  // do something
}

讨论:

虽然使用流名称的第二种方法更自然(根据@zenbeni),第一种更灵活(IHMO).流名称由 spout/bolt 直接声明(即在编写 spout/bolt 代码时);相比之下,当拓扑组合在一起时(即,在使用 spout/bolt 时)分配操作符 ID.

While the second approach using stream names is more natural (according to @zenbeni), the first is more flexible (IHMO). Stream names are declared by spout/bolt directly (ie, at the time the spout/bolt code is written); in contrast, operator IDs are assigned when topology is put together (ie, at the time the spout/bolt is used).

假设我们得到三个螺栓作为类文件(没有源代码).前两个应该用作生产者,并且都声明具有相同名称的输出流.如果第三个消费者按流区分输入元组,这将不起作用.即使两个给定的生产者 bolt 声明了不同的输出流名称,预期的输入流名称也可能在消费者 bolt 中硬编码并且可能不匹配.因此,它也不起作用.但是,如果消费者 Bolt 使用组件名称(即使它们是硬编码的)来区分传入的元组,则可以正确分配预期的组件 ID.

Let's assume we get three bolts as class files (no source code). The first two should be used as producers and both declare output streams with the same name. If the third consumer distinguishes input tuples by stream, this will not work. Even if both given producer bolts declare different output stream names, the expected input stream names might be hard coded in consumer bolt and might not match. Thus, it does not work either. However, if the consumer bolt uses component names (even if they are hard coded) to distinguish incoming tuples, the expected component IDs can be assigned correctly.

当然,可以从给定的类继承(如果没有声明 final 并覆盖 declareOutputFields(...) 以分配自己的流名称. 然而,这是更多额外的工作要做.

Of course, it would be possible to inherit from the given classes (if not declared final and overwrite declareOutputFields(...) in order to assign own stream names. However, this is more additional work to do.

这篇关于如何将两个不同 Spout 的输出发送到同一个 Bolt?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆