在Apache Storm螺栓中使用Apache Camel ProducerTemplate [英] Using Apache Camel ProducerTemplate in Apache Storm bolt

查看:136
本文介绍了在Apache Storm螺栓中使用Apache Camel ProducerTemplate的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写简单的Storm + Camel项目. 我的Storm拓扑分析了tweet,一个螺栓应该将tweet文本发送到apache骆驼路线,而后者又使用websocket通知了一些webapp.

I'm trying to write simple Storm + Camel project. My Storm topology analyzes tweets and one bolt should send tweet text to apache camel route, which in turn is using websocket to notify some webapp.

由于在尝试使用一次CamelContext构建时,由于从螺栓接收到NotSerializableExceptions,所以我无法使其工作.

I cannot make it work due to NotSerializableExceptions received from bolts when trying to use build once CamelContext.

我已经尝试过的:

  • 在螺栓的构造函数中传递CamelContext-导致NotSerializableException
  • 在storm conf中传递CamelContext,并在bolt的prepare(...)方法中使用它来加强对其的访问.结果:

  • pass CamelContext in bolt's constructor - results in NotSerializableException
  • pass CamelContext in storm conf, and use it in bolt's prepare(...) method to gian access to it. Results in :

14484 [main]错误org.apache.storm.zookeeper.server.NIOServerCnxnFactory-线程Thread [main,5,main]死亡 java.lang.IllegalArgumentException:拓扑conf无法json序列化 在backtype.storm.testing $ submit_local_topology.invoke(testing.clj:262)〜[storm-core-0.9.4.jar:0.9.4] 在backtype.storm.LocalCluster $ _submitTopology.invoke(LocalCluster.clj:43)〜[storm-core-0.9.4.jar:0.9.4] 在backtype.storm.LocalCluster.submitTopology(未知来源)〜[storm-core-0.9.4.jar:0.9.4]

14484 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died java.lang.IllegalArgumentException: Topology conf is not json-serializable at backtype.storm.testing$submit_local_topology.invoke(testing.clj:262) ~[storm-core-0.9.4.jar:0.9.4] at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4] at backtype.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-0.9.4.jar:0.9.4]

骆驼路线:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:main")
                .to("websocket:localhost:8085/main?sendToAll=true");
    }
}

风暴拓扑: Tweet Spout正在使用twitter4j stremaing API传播tweet.

Storm Topology: Tweet Spout is spreading tweets using twitter4j stremaing API.

public class TwitterStreamTopology {

    public static void main(String[] args) {
        CamelContext producerTemplate = new RouteStarter().buildRoute();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
        builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
        Config conf = new Config();
        conf.put("producerTemplate", producerTemplate.createProducerTemplate());
        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("mytopology", conf, builder.createTopology());

        Utils.sleep(20000);
        cluster.shutdown();
    }
}

WebsocketBolt:

WebsocketBolt:

public class WebSocketBolt extends BaseBasicBolt {
    private ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
    }
}

有没有办法很好地做到这一点?

Is there a way to do this nicely?

还是我应该让骆驼路由被http访问,并在bolt prepare(...)方法中创建一些HttpClient?这看起来仍然有些矫kill过正,并且必须有一种使其变得更容易的方法.

Or should I make camel route being accessed by http, and create some HttpClient in bolt prepare(...) method? This still looks like a little overkill, and there has to be a way to make it easier.

感谢所有帮助!

推荐答案

问题的根本原因是您正在将ProducerTemplate添加到您的风暴配置中,并且由于无法序列化而引发了异常.如果那是您自己的课,则可以更改代码以使其起作用,但是由于这是骆驼课,所以我建议您使用其他方法.

The root cause of your problem is that you're adding ProducerTemplate to your storm config and it is throwing an exception because it isn't serializable. If that were your own class, you could change the code to make it work but since that is a Camel class I would recommend a different approach.

  1. WebSocketBolt:将您的producerTemplate私有成员更改为临时成员:private transient ProducerTemplate producerTemplate;,以便它不会尝试进行序列化(将其放入conf时遇到同样的问题).
  2. WebSocketBolt:在prepare方法而不是拓扑中初始化producerTemplate.
  1. WebSocketBolt: Change your producerTemplate private member to be transient: private transient ProducerTemplate producerTemplate; so that it will not attempt to be serialized (same problem you have with putting it into conf).
  2. WebSocketBolt: Initialize producerTemplate inside your prepare method rather than in your topology.

类似这样的东西:

public class WebSocketBolt extends BaseBasicBolt {
    private transient ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        CamelContext producerTemplate = new RouteStarter().buildRoute();
        this.producerTemplate = producerTemplate.createProducerTemplate();
    }
}

这篇关于在Apache Storm螺栓中使用Apache Camel ProducerTemplate的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆