事件流如何在 wso2cep3.0.0 中工作 [英] How Event stream Works in wso2cep3.0.0

查看:21
本文介绍了事件流如何在 wso2cep3.0.0 中工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 wso2cep3.0.0,我的输入源和输出源是JMs.我像这样写了我的输入事件适配器和输出事件适配器

I am working with wso2cep3.0.0, My input source and out put source is JMs.I written my Input event adapter and output event adapter like this

输入适配器

<?xml version="1.0" encoding="UTF-8"?>
<inputEventAdaptor name="jmsProxy" statistics="disable" trace="enable"
  type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
  <property name="java.naming.provider.url">tcp://localhost:61616</property>
  <property name="transport.jms.SubscriptionDurable">false</property>
  <property name="transport.jms.UserName">admin</property>
  <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property>
  <property name="transport.jms.Password">admin</property>
  <property name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</property>
  <property name="transport.jms.DestinationType">queue</property>
</inputEventAdaptor>

还有我的

输出事件适配器

<?xml version="1.0" encoding="UTF-8"?>
<outputEventAdaptor name="OUTJmsProxy" statistics="disable" trace="disable"
  type="jms" xmlns="http://wso2.org/carbon/eventadaptormanager">
  <property name="java.naming.security.principal">admin</property>
  <property name="java.naming.provider.url">tcp://localhost:61616</property>
  <property name="java.naming.security.credentials">admin</property>
  <property name="java.naming.factory.initial">org.apache.activemq.jndi.ActiveMQInitialContextFactory</property>
  <property name="transport.jms.ConnectionFactoryJNDIName">QueueConnectionFactory</property>
  <property name="transport.jms.DestinationType">queue</property>
</outputEventAdaptor>

我在jmsproxy jms队列中的输入消息是这样的

and my input message in jmsproxy jms queue is like this

<soapenv:Body xmlns:soapenv="http://www.w3.org/2003/05/soap-envelope">
   <uuid>cc253480-95b3-418e-b282-7e87f885c99e</uuid>
   <Remarks>t4</Remarks>
   <ReadingsLiteTaildtos>
      <ReadingsLiteTaildto>
         <FinalValue>70</FinalValue>
         <InputText>Chiller Feeder Current R - Ph</InputText>
         <InputValue>0.0</InputValue>
         <ParameterId>-2499999974</ParameterId>
         <SlNo>1</SlNo>
      </ReadingsLiteTaildto>
      <ReadingsLiteTaildto>
         <FinalValue>70</FinalValue>
         <InputText>Chiller Feeder Current Y - Ph</InputText>
         <InputValue>0.0</InputValue>
         <ParameterId>-2499999973</ParameterId>
         <SlNo>2</SlNo>
      </ReadingsLiteTaildto>
      <ReadingsLiteTaildto>
         <FinalValue>70</FinalValue>
         <InputText>Chiller Feeder Current B - Ph</InputText>
         <InputValue>0.0</InputValue>
         <ParameterId>-2499999972</ParameterId>
         <SlNo>3</SlNo>
      </ReadingsLiteTaildto>
      <ReadingsLiteTaildto>
         <FinalValue>70</FinalValue>
         <InputText>Chiller Energy Meter Reading</InputText>
         <InputValue>0.0</InputValue>
         <ParameterId>-2499999971</ParameterId>
         <SlNo>4</SlNo>
      </ReadingsLiteTaildto>
   </ReadingsLiteTaildtos>
   <ReadingDateTime>1381757157596</ReadingDateTime>
   <PartyBranchId>-2500000000</PartyBranchId>
   <ParametersetId>-2499999974</ParametersetId>
   <AssetId>-2499999995</AssetId>
   <TaskId>811291126760647</TaskId>
   <WorkOUId>-1</WorkOUId>
   <activityid>-2500000000</activityid>
   <userid>-2499999993</userid>
   <entrymode>0</entrymode>
   <DeviceId>-1</DeviceId>
</soapenv:Body>

我希望在最终值超过最大值时引发一个事件,例如超过 100那么我将如何编写 Stream 和

i wish to raise an event when final value cross the max value like more than 100 so how would i write Stream and

执行计划

在stream-manger-config.xml文件中包含3节

In stream-manger-config.xml file consist 3 section

1.metaData 2.Correlation Data 3.Payload Data

1.metaData 2.Correlation Data 3.Payload Data

所以上面的消息我将如何定义哪个数据在哪个部分下还有一个我们应该在同一个流配置文件中定义输入有效载荷和输出有效载荷,否则我们需要单独定义

so above message how would i define which data is under which section one more we should define input payload and out payload as well in same stream config file else we need to define separate

cep 是否对这个用例有帮助

Is cep help for this usecase or not

提前致谢.

推荐答案

是的,这是 CEP 的典型用例.

Yes, this is a typical usecase for CEP.

您可以使用类似于以下内容的事件构建器".

You can use an 'event builder' similar to following.

<?xml version="1.0" encoding="UTF-8"?>
<eventBuilder name="ReadingsDtoBuilder" statistics="disable"
    trace="disable" xmlns="http://wso2.org/carbon/eventbuilder">
<from eventAdaptorName="jmsEventReceiver" eventAdaptorType="jms">
    <property name="transport.jms.Destination">ReadingsQueue</property>
</from>
<mapping customMapping="disable"
    parentXpath="//ReadingsLiteTaildtos" type="xml">
    <property>
        <from xpath="//ReadingsLiteTaildto/ParameterId"/>
        <to name="meta_parameterId" type="string"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/Slno"/>
        <to name="meta_slno" type="string"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/FinalValue"/>
        <to name="finalValue" type="int"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/InputText"/>
        <to name="inputText" type="string"/>
    </property>
    <property>
        <from xpath="//ReadingsLiteTaildto/InputValue"/>
        <to name="inputValue" type="double"/>
    </property>
</mapping>
<to streamName="org.sample.readings.dto.stream" version="1.0.0"/>
</eventBuilder>

执行计划可以如下.

<?xml version="1.0" encoding="UTF-8"?>
<executionPlan name="ReadingsAnalyzer" statistics="disable"
  trace="disable" xmlns="http://wso2.org/carbon/eventprocessor">
  <description>This execution plan analyzes readings and triggers notifications based on     threshold.</description>
  <siddhiConfiguration>
    <property name="siddhi.enable.distributed.processing">false</property>
    <property name="siddhi.persistence.snapshot.time.interval.minutes">0</property>
  </siddhiConfiguration>
  <importedStreams>
    <stream as="readings" name="org.sample.readings.dto.stream" version="1.0.0"/>
  </importedStreams>
  <queryExpressions><![CDATA[from readings[finalValue > 100]
select *
insert into notificationStream;]]></queryExpressions>
  <exportedStreams>
    <stream name="notificationStream" valueOf="notificationStream" version="1.0.0"/>
  </exportedStreams>
</executionPlan>

您可以在 stream-manager-config.xml 中定义类似于以下内容的流.

You can define streams inside stream-manager-config.xml similar to the following.

<streamDefinition name="org.sample.readings.dto.stream" version="1.0.0">
<metaData>
        <property name="parameterId" type="STRING"/>
        <property name="slno" type="STRING"/>
</metaData>
    <payloadData>
        <property name="finalValue" type="INT"/>
        <property name="inputText" type="STRING"/>
        <property name="inputValue" type="DOUBLE"/>
    </payloadData>
</streamDefinition>
<streamDefinition name="notificationStream" version="1.0.0">
<metaData>
        <property name="parameterId" type="STRING"/>
        <property name="slno" type="STRING"/>
</metaData>
    <payloadData>
        <property name="finalValue" type="INT"/>
        <property name="inputText" type="STRING"/>
        <property name="inputValue" type="DOUBLE"/>
    </payloadData>
</streamDefinition>

这篇关于事件流如何在 wso2cep3.0.0 中工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆