Storm - 有条件地消耗来自 kafka 喷口的流? [英] Storm - Conditionally consuming stream from kafka spout?

查看:23
本文介绍了Storm - 有条件地消耗来自 kafka 喷口的流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个场景,我将 json 发布到 Kafka 实例.然后我使用 Kafka Spout 将流发送到 Bolt.

I have a scenario where I am posting json to a Kafka instance. I am then using a Kafka Spout to emit the stream to a bolt.

但现在我想在我的 json 消息中添加额外的字段(称为 x).如果 xa 我希望它被 boltA 使用,如果 xb 我希望它被boltB消耗.

But now I would like to add additional field (call it x) to my json message. If x is a I would like it to be consumed by boltA, if x is b I would like it to be consumed by boltB.

有没有办法根据流的内容将流引导到正确的螺栓?

Is there a way to direct the stream to the proper bolt depending on the streams contents?

推荐答案

最简单的方法应该是添加一个 SplitBolt ,它从 KafkaSpout 消费,评估字段 x ,并转发到不同的输出流:

The simplest way should be to add a SplitBolt that consumes from KafkaSpout, evaluates the field x , and forwards to different output streams:

public class SplitBolt extends BaseRichBolt {
  OutputCollector collector;

  public void prepare(...) {
    this.collector = collector;
  }

  public void execute(Tuple input) {
    Object x = ... // get field x from input
    String streamId;
    if(x == a) {
      streamId = "stream-xa";
    } else { // x == b
      streamId = "stream-xb";
    }
    collector.emit(streamId, input, input.getValues());
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    Fields schema = new Fields(...)
    declarer.declareStream("stream-xa", schema);
    declarer.declareStream("stream-xy", schema);
  }
}

在构建拓扑时,您将 BoltA 连接到stream-xa"并将 BoltB 连接到stream-xb":

When building your Topology, you connect BoltA to "stream-xa" and BoltB to "stream-xb":

TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");

作为替代方案,也应该可以从 KafkaSpout 继承并直接发送到两个不同的流.然而,代码更难正确.

As an alternative, it should also be possible, to inherit from KafkaSpout and emit to two different streams directly. However, the code is more tricky to get right.

这篇关于Storm - 有条件地消耗来自 kafka 喷口的流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆