将两个分区流合并为一个流 [英] Combining two partitioned streams into one stream

查看:88
本文介绍了将两个分区流合并为一个流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Azure事件中心的新手.通过事件中心,我们从IoT设备接收数据,并通过分配分区号"0"和"1"将数据分为两个流.

I am new to Azure event hub. With the event hub, we receive data from IoT device and the data are partitioned into two streams by assigning partition number "0" and " 1".

我们之所以需要两个数据流,是因为一个数据流需要训练深度学习模型",而另一个数据流需要使​​用另一端的新数据来测试我们训练的模型.

The reason why we need two streams is that one is needed for training the "deep learning model" and another one is needed for testing the model we trained with new data coming in from the other side.

这称为在线学习".

但是,在尚无训练模型的情况下,我们无法使用该模型对其进行测试,因此,在这种情况下,我希望将两个分区的流合并为一个,而不是使用两个流不会浪费数据.稍后,一旦创建了模型,我们就可以让两个流同时进行测试和训练.

However, in the case where we do not have a training model yet, we are not able to test it with the model, so instead of having two streams in this case, I would rather combine two partitioned streams into one so that there is no waste in data. Later once the model is created then we can have two streams back to test and train at the same time.

我找不到任何模块可以将它们组合到事件中心脚本中.有什么建议?

I could not find any module that enables to combine them in event hub scripts. Any suggestions?

推荐答案

如果可以在发送到事件中心期间向数据添加属性,则可以尝试以下步骤.

If you can add properties to the data during sending to event hub, then you can try the steps below.

1.我们需要为每个事件数据设置2个属性.

1.We need to set 2 properties for each event data.

对于测试数据,我们可以添加以下2个属性:

For test data, we can add the following 2 properties:

property_name:类别",其名称:"test" ,用于确定您要接收的数据类型,例如测试或火车.

property_name: "category", its velue: "test", which is used to determine which kind of data you're receiving, like for test or for train.

property_name:"seqNum",其值为数字,例如0、1、2、3,用于确定数据的顺序.

property_name: "seqNum", its value is number, like 0,1,2,3, which is used to determine the sequence of the data.

对于火车数据,请使用上述步骤,只需将类别值更改为"火车"即可.

And for train data, use the steps above, just change category value to "train".

我在c#代码中设置了这些属性,如下所示.您可以通过自己的方式设置它,而无需使用c#:

I set these properties in c# code, looks like below. You can set it via your own way without c#:

        for (var i = 0; i < numMessagesToSend; i++)
        {                
                var message = "555 Message";
                EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));

                //add properties
                mydata.Properties.Add("seqNum", i);
                mydata.Properties.Add("category", "test");
                await eventHubClient.SendAsync(mydata);

         }

然后使用以下python代码接收数据.在这里,我定义了2个字典,一个用于存储测试数据,另一个用于存储火车数据.

Then use the following python code to receive the data. Here, I define 2 dicts, one for store test data, another for store train data.

import logging
import asyncio
import os
import sys
import signal
import functools

from azure.eventprocessorhost import (
    AbstractEventProcessor,
    AzureStorageCheckpointLeaseManager,
    EventHubConfig,
    EventProcessorHost,
    EPHOptions
)

# define 2 dictionaries, to store test data and train data respectively.
dict_test={}
dict_train={}

class EventProcessor(AbstractEventProcessor):

    def __init__(self, params=None):       
        super().__init__(params)
        self._msg_counter = 0

    async def open_async(self, context):        
        print("Connection established {}".format(context.partition_id))

    async def close_async(self, context, reason):

        print("Connection closed (reason {}, id {}, offset {}, sq_number {})".format(
            reason,
            context.partition_id,
            context.offset,
            context.sequence_number))

    async def process_events_async(self, context, messages):

        for m in messages:
            data = m.body_as_str()
            if m.application_properties is not None:
                mycategory = m.application_properties.get(b'category').decode('utf-8')
                mysequence = str(m.application_properties.get(b'seqNum'))                

                if mycategory == 'test':
                    dict_test[mysequence]=data

                if mycategory == 'train':
                    dict_train[mysequence]=data

                print("Received data: {}".format(data))
        await context.checkpoint_async()

    async def process_error_async(self, context, error):

        print("Event Processor Error {!r}".format(error))


async def wait_and_close(host):

    await asyncio.sleep(60)
    await host.close_async()

try:
    loop = asyncio.get_event_loop()

    # Storage Account Credentials
    STORAGE_ACCOUNT_NAME = "xxx"
    STORAGE_KEY = "xxxx"
    LEASE_CONTAINER_NAME = "xxx"
    NAMESPACE = "xxx"
    EVENTHUB = "xxx"
    USER = "RootManageSharedAccessKey"
    KEY = "xxxx"

    # Eventhub config and storage manager 
    eh_config = EventHubConfig(NAMESPACE, EVENTHUB, USER, KEY, consumer_group="$default")
    eh_options = EPHOptions()
    eh_options.release_pump_on_timeout = True
    eh_options.debug_trace = False
    storage_manager = AzureStorageCheckpointLeaseManager(
        STORAGE_ACCOUNT_NAME, STORAGE_KEY, LEASE_CONTAINER_NAME)

    # Event loop and host
    host = EventProcessorHost(
        EventProcessor,
        eh_config,
        storage_manager,
        ep_params=["param1","param2"],
        eph_options=eh_options,
        loop=loop)



    tasks = asyncio.gather(
        host.open_async(),
        wait_and_close(host))
    loop.run_until_complete(tasks)

    print("***this is the data for test***")
    print(dict_test)
    print("***-----------------------***")
    print("***this is the data for train***")
    print(dict_train)

except KeyboardInterrupt:
    # Canceling pending tasks and stopping the loop
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.run_forever()
    tasks.exception()

finally:
    loop.stop()

测试结果如下:

最后一步,由于测试数据/训练数据分别存储在词典中,并且字典的键是序列号,因此您可以编写代码来操作字典,按顺序重建测试数据/训练数据.

The last step, since the test data / train data are stored in dictionaries respectively, and the keys of the dict are the sequence number, you can write your code to operate the dict, rebuilt test data / train data in sequence.

这篇关于将两个分区流合并为一个流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆