风暴-卡夫卡喷口有条件地消耗溪流吗? [英] Storm - Conditionally consuming stream from kafka spout?

查看:79
本文介绍了风暴-卡夫卡喷口有条件地消耗溪流吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一种将json发布到Kafka实例的方案.然后,我使用Kafka喷口将水流喷到螺栓上.

I have a scenario where I am posting json to a Kafka instance. I am then using a Kafka Spout to emit the stream to a bolt.

但是现在我想在我的json消息中添加其他字段(称为x).如果xa,我希望它被螺栓A消耗,如果xb,我希望它被螺栓B消耗.

But now I would like to add additional field (call it x) to my json message. If x is a I would like it to be consumed by boltA, if x is b I would like it to be consumed by boltB.

是否有一种方法可以根据溪流的内容将溪流引导至适当的螺栓?

Is there a way to direct the stream to the proper bolt depending on the streams contents?

推荐答案

最简单的方法应该是添加一个SplitBolt,该SplitBoltKafkaSpout消耗,计算字段x,然后转发到不同的输出流:

The simplest way should be to add a SplitBolt that consumes from KafkaSpout, evaluates the field x , and forwards to different output streams:

public class SplitBolt extends BaseRichBolt {
  OutputCollector collector;

  public void prepare(...) {
    this.collector = collector;
  }

  public void execute(Tuple input) {
    Object x = ... // get field x from input
    String streamId;
    if(x == a) {
      streamId = "stream-xa";
    } else { // x == b
      streamId = "stream-xb";
    }
    collector.emit(streamId, input, input.getValues());
  }

  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    Fields schema = new Fields(...)
    declarer.declareStream("stream-xa", schema);
    declarer.declareStream("stream-xy", schema);
  }
}

构建拓扑时,将BoltA连接到"stream-xa",将BoltB连接到"stream-xb":

When building your Topology, you connect BoltA to "stream-xa" and BoltB to "stream-xb":

TopologyBuilder b = new TopologyBuilder();
b.setSpout("spout", new KafkaSpout(...));
b.setBolt("split", new SplitBolt()).shuffleGrouping("spout");
b.setBolt("boltA", new BoltA()).shuffleGrouping("split", "stream-xa");
b.setBolt("boltB", new BoltB()).shuffleGrouping("split", "stream-xb");

作为替代方案,也应该有可能从KafkaSpout继承并直接发射到两个不同的流.但是,代码更难正确.

As an alternative, it should also be possible, to inherit from KafkaSpout and emit to two different streams directly. However, the code is more tricky to get right.

这篇关于风暴-卡夫卡喷口有条件地消耗溪流吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆