在 Apache Storm bolt 中使用 Apache Camel ProducerTemplate [英] Using Apache Camel ProducerTemplate in Apache Storm bolt

查看:34
本文介绍了在 Apache Storm bolt 中使用 Apache Camel ProducerTemplate的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写简单的 Storm + Camel 项目.我的 Storm 拓扑分析了推文,一个 bolt 应该将推文文本发送到 apache camel route,而后者又使用 websocket 通知某些 webapp.

I'm trying to write simple Storm + Camel project. My Storm topology analyzes tweets and one bolt should send tweet text to apache camel route, which in turn is using websocket to notify some webapp.

由于在尝试使用 build once CamelContext 时从 Bolts 收到 NotSerializableExceptions,我无法使其工作.

I cannot make it work due to NotSerializableExceptions received from bolts when trying to use build once CamelContext.

我已经尝试过的:

  • 在 bolt 的构造函数中传递 CamelContext - 导致 NotSerializableException
  • 在storm conf 中传递CamelContext,并在bolt 的prepare(...) 方法中使用它来访问它.结果:

  • pass CamelContext in bolt's constructor - results in NotSerializableException
  • pass CamelContext in storm conf, and use it in bolt's prepare(...) method to gian access to it. Results in :

14484 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] 死了java.lang.IllegalArgumentException:拓扑 conf 不是 json-serializable在 backtype.storm.testing$submit_local_topology.invoke(testing.clj:262) ~[storm-core-0.9.4.jar:0.9.4]在 backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4]在 backtype.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-0.9.4.jar:0.9.4]

14484 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died java.lang.IllegalArgumentException: Topology conf is not json-serializable at backtype.storm.testing$submit_local_topology.invoke(testing.clj:262) ~[storm-core-0.9.4.jar:0.9.4] at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4] at backtype.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-0.9.4.jar:0.9.4]

骆驼路线:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:main")
                .to("websocket:localhost:8085/main?sendToAll=true");
    }
}

风暴拓扑:Tweet Spout 正在使用 twitter4j stremaing API 传播推文.

Storm Topology: Tweet Spout is spreading tweets using twitter4j stremaing API.

public class TwitterStreamTopology {

    public static void main(String[] args) {
        CamelContext producerTemplate = new RouteStarter().buildRoute();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
        builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
        Config conf = new Config();
        conf.put("producerTemplate", producerTemplate.createProducerTemplate());
        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("mytopology", conf, builder.createTopology());

        Utils.sleep(20000);
        cluster.shutdown();
    }
}

WebsocketBolt:

WebsocketBolt:

public class WebSocketBolt extends BaseBasicBolt {
    private ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
    }
}

有没有办法很好地做到这一点?

Is there a way to do this nicely?

或者我应该让 http 访问骆驼路由,并在 bolt prepare(...) 方法中创建一些 HttpClient?这看起来还是有点矫枉过正,必须有办法让它更容易.

Or should I make camel route being accessed by http, and create some HttpClient in bolt prepare(...) method? This still looks like a little overkill, and there has to be a way to make it easier.

感谢大家的帮助!

推荐答案

您问题的根本原因是您将 ProducerTemplate 添加到您的 Storm 配置中,并且由于它不可序列化而引发异常.如果那是您自己的类,您可以更改代码以使其正常工作,但由于这是一个 Camel 类,我建议采用不同的方法.

The root cause of your problem is that you're adding ProducerTemplate to your storm config and it is throwing an exception because it isn't serializable. If that were your own class, you could change the code to make it work but since that is a Camel class I would recommend a different approach.

  1. WebSocketBolt:将您的 producerTemplate 私有成员更改为瞬态:private瞬态 ProducerTemplate producerTemplate; 以便它不会尝试序列化(与将其放入 conf 时遇到的问题相同).
  2. WebSocketBolt:在准备方法中而不是在拓扑中初始化 producerTemplate.
  1. WebSocketBolt: Change your producerTemplate private member to be transient: private transient ProducerTemplate producerTemplate; so that it will not attempt to be serialized (same problem you have with putting it into conf).
  2. WebSocketBolt: Initialize producerTemplate inside your prepare method rather than in your topology.

像这样:

public class WebSocketBolt extends BaseBasicBolt {
    private transient ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        CamelContext producerTemplate = new RouteStarter().buildRoute();
        this.producerTemplate = producerTemplate.createProducerTemplate();
    }
}

这篇关于在 Apache Storm bolt 中使用 Apache Camel ProducerTemplate的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆