如何在风暴拓扑中使用流口水 [英] how to use drools in storm topology

查看:92
本文介绍了如何在风暴拓扑中使用流口水的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

现在我想在污点中使用Drools,它在LocalCluster中可以正常工作,但是当我将其放入生产集群时,它有错误.污点是:

Now I want to use Drools in a blot,it works normal in the LocalCluster, but when I put it to the production cluster,it has error. The blot is:

public class DealLostBolt extends BaseRichBolt {

      private static final long serialVersionUID = 1L;

      private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_LOST_BOLT");

      private OutputCollector collector;

      private KieSession kieSession;

      private FactHandle factHandle;

      @Override
      public void execute(Tuple input) {
        // 获取数据
        String sentence = (String) input.getValue(0);
        LOGGER.info("DealLostBolt获取到的数据:" + sentence);

        // 数据转换
        PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);

        KieServices ks = KieServices.Factory.get();
        KieContainer kieContainer = ks.getKieClasspathContainer();
        kieSession = kieContainer.newKieSession("all-rule");
        kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();

        factHandle = kieSession.insert(dataPoint);
        kieSession.fireAllRules();
        kieSession.delete(factHandle);

        collector.emit(new Values(sentence));
      }

      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("value"));

      }

      @Override
      public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
      }

    }

我用正式文件创建了亲密关系.错误是:

I used official documents to create the kiesession. The erros is:

java.lang.RuntimeException: java.lang.NullPointerException
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
    Caused by: java.lang.NullPointerException
    at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.getResource(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
    at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40) ~[storm-core-1.1.1.jar:1.1.1]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482) ~[storm-core-1.1.1.jar:1.1.1]
    ... 6 more

也许某些东西没有初始化.但是当执行印迹时,我创建了一个新的kieservice.有人可以帮我吗?

Perhapse something do not initialization. But I create a new kieservice when blot execute. Could somebody help me?

谢谢!

推荐答案

我在将Drools与 JMH 作为阴影罐.Drools使用 ServiceRegistry 方法.这意味着Drools库(drools-compiler,kie-ci,drools-decisiontables等)包含相同的命名属性文件,该文件指示它们提供的接口的实现.

I had a similar problem using Drools together with JMH as shaded jar. Drools uses a ServiceRegistry approach. This means the Drools libraries (drools-compiler, kie-ci, drools-decisiontables, ...) contain a same named property file, that indicates which implementation for an interface they offer.

带阴影的jar插件通常将(传递性)依赖关系拼合到一个jar中.对于多次存在的文件,这通常意味着,如果没有另外指定,则选择其中之一.对于ServiceRegistry属性,我们需要合并所有文件.通常通过 ServicesResourceTransformer AppendingTransformer来解决:

The shaded jar plugin usually flattens the (transitive) dependencies into one jar. For files that exists multiple times, this usually means that one of them is selected if not otherwise specified. For ServiceRegistry properties, we need to combine all files. Usually this is done via the ServicesResourceTransformer. This Transformer handles files in META-INF/services/, but the relevant file for Drools is META-INF/kie.conf. My problem with JMH could be solved with the AppendingTransformer:

<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    <resource>META-INF/kie.conf</resource>
</transformer>

我不是Storm专家,但

I'm not an expert of Storm, but the Starter suggests that it also makes use of the shade plugin. I assume you run your local cluster from the IDE - which does not use the shaded jar.

这篇关于如何在风暴拓扑中使用流口水的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆