仅从单个设备处理Azure IoT中心事件 [英] Process Azure IoT hub events from a single device only

查看:78
本文介绍了仅从单个设备处理Azure IoT中心事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试解决以下问题:部署数千个IoT设备,将所有事件记录到Azure IoT中心,然后仅读取单个deviceid创建的事件.

我一直在玩EventProcessorHost以使类似的工作正常进行,但到目前为止,我只能看到一种从所有设备读取 all 消息的方法./p>

读取所有消息并过滤客户端是不可行的解决方案,因为可能有数百万条消息.

解决方案

Azure IoT中心的主要目的是吸收从设备到云流管道的大量事件,以便对其进行实时分析.默认遥测路径(快速方式)是通过内置的事件中心进行的,其中所有事件都临时存储在EH分区中. 除了该默认终结点(事件),还具有根据规则(条件)将事件消息路由到自定义终结点的功能.

请注意,自定义端点的数量限制为10个,规则的数量限制为100个.如果此限制与您的业务模型匹配,则可以非常容易地分别传输10个设备,如Davis的回答中所述

但是,根据超出此限制(10 + 1)的源(设备)分割遥测流管道,将需要使用其他天蓝色实体(组件).

下图显示了使用Pub/Sub推送模型基于设备拆分遥测流管道的解决方案.

以上解决方案基于使用自定义主题发布者将流事件转发到Azure事件网格的方法. 此处. 事件网格的自定义主题发布者由Azure EventHubTrigger函数表示,其中每个流事件都映射到事件网格事件消息中,主题指示已注册设备.

Azure事件网格是发布/订阅松散耦合的模型,其中事件根据订阅者的订阅传递给订阅者.换句话说,如果没有匹配的传递,事件消息就会消失.

请注意,事件网格路由的能力是每个区域每秒1000万个事件.每个区域的订阅数限制为1000.

使用 REST Api ,该订阅可以动态创建,更新,删除等.

以下代码段显示了将流事件映射到EG事件消息的AF实现的示例.如您所见,它是非常简单的实现:

run.csx:

#r "Newtonsoft.Json"
#r "Microsoft.ServiceBus"


using System.Configuration;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;

// reusable client proxy
static HttpClient client = HttpClientHelper.Client(ConfigurationManager.AppSettings["TopicEndpointEventGrid"], ConfigurationManager.AppSettings["aeg-sas-key"]);

// AF
public static async Task Run(EventData ed, TraceWriter log)
{
    log.Info($"C# Event Hub trigger function processed a message:{ed.SequenceNumber}"); 

    // fire EventGrid Custom Topic
    var egevent = new EventGridEvent()
    {
        Id = ed.SequenceNumber.ToString(),
        Subject = $"/iothub/events/{ed.SystemProperties["iothub-message-source"] ?? "?"}/{ed.SystemProperties["iothub-connection-device-id"] ?? "?"}",
        EventType = "telemetryDataInserted",
        EventTime = ed.EnqueuedTimeUtc,
        Data = new
        {
            sysproperties = ed.SystemProperties,
            properties = ed.Properties,
            body = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(ed.GetBytes()))
        }
    };
    await client.PostAsJsonAsync("", new[] { egevent });  
}

// helper
class HttpClientHelper
{
    public static HttpClient Client(string address, string key)
    {      
        var client = new HttpClient() { BaseAddress = new Uri(address) };
        client.DefaultRequestHeaders.Add("aeg-sas-key", key);
        return client;      
    }
}

function.json:

    {
      "bindings": [
        {
         "type": "eventHubTrigger",
         "name": "ed",
         "direction": "in",
         "path": "<yourEventHubName>",
         "connection": "<yourIoTHUB>",
         "consumerGroup": "<yourGroup>",
         "cardinality": "many"
        }
      ],
     "disabled": false
  }

project.json:

{
  "frameworks": {
    "net46":{
      "dependencies": {
        "Microsoft.Azure.EventGrid": "1.1.0-preview"
      }
    }
   }
}

最后,以下屏幕片段显示了AF订户收到的Device1的事件网格事件消息:

I'm trying to solve for having thousands of IoT devices deployed, all logging events to Azure IoT hub, then being able to read events created by a single deviceid only.

I have been playing with EventProcessorHost to get something like this working, but so far I can only see a way to read all messages from all devices.

Its not a feasible solution to read all the messages and filter client side as there may be millions of messages.

解决方案

The major purpose of the Azure IoT Hub is an ingestion of mass events from the devices to the cloud stream pipeline for their analyzing in the real-time manner. The default telemetry path (hot way) is via a built-in Event Hub, where all events are temporary stored in the EH partitions. Besides that default endpoint (events), there is also capability to route an event message to the custom endpoints based on the rules (conditions).

Note, that the number of custom endpoints is limited to 10 and the number of rules to 100. If this limit is matching your business model, you can very easy to stream 10 devices individually, like is described in the Davis' answer.

However, splitting a telemetry stream pipeline based on the sources (devices) over this limit (10+1), it will require to use additional azure entities (components).

The following picture shows a solution for splitting a telemetry stream pipeline based on the devices using a Pub/Sub push model.

The above solution is based on forwarding the stream events to the Azure Event Grid using a custom topic publisher. The event schema for Event Grid eventing is here. The Custom Topic Publisher for Event Grid is represented by Azure EventHubTrigger Function, where each stream event is mapped into the Event Grid event message with a subject indicated a registered device.

The Azure Event Grid is a Pub/Sub loosely decoupled model, where the events are delivered to the subscribers based on their subscribed subscriptions. In other words, if there is no match for delivery, the event message is disappeared.

Note, that the capable of Event Grid routing is 10 millions events per second per region. The limit of the number of subscriptions is 1000 per region.

Using the REST Api, the subscription can be dynamically created, updated, deleted, etc.

The following code snippet shows an example of the AF implementation for mapping the stream event to the EG event message. As you can see it is very straightforward implementation:

run.csx:

#r "Newtonsoft.Json"
#r "Microsoft.ServiceBus"


using System.Configuration;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;

// reusable client proxy
static HttpClient client = HttpClientHelper.Client(ConfigurationManager.AppSettings["TopicEndpointEventGrid"], ConfigurationManager.AppSettings["aeg-sas-key"]);

// AF
public static async Task Run(EventData ed, TraceWriter log)
{
    log.Info($"C# Event Hub trigger function processed a message:{ed.SequenceNumber}"); 

    // fire EventGrid Custom Topic
    var egevent = new EventGridEvent()
    {
        Id = ed.SequenceNumber.ToString(),
        Subject = $"/iothub/events/{ed.SystemProperties["iothub-message-source"] ?? "?"}/{ed.SystemProperties["iothub-connection-device-id"] ?? "?"}",
        EventType = "telemetryDataInserted",
        EventTime = ed.EnqueuedTimeUtc,
        Data = new
        {
            sysproperties = ed.SystemProperties,
            properties = ed.Properties,
            body = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(ed.GetBytes()))
        }
    };
    await client.PostAsJsonAsync("", new[] { egevent });  
}

// helper
class HttpClientHelper
{
    public static HttpClient Client(string address, string key)
    {      
        var client = new HttpClient() { BaseAddress = new Uri(address) };
        client.DefaultRequestHeaders.Add("aeg-sas-key", key);
        return client;      
    }
}

function.json:

    {
      "bindings": [
        {
         "type": "eventHubTrigger",
         "name": "ed",
         "direction": "in",
         "path": "<yourEventHubName>",
         "connection": "<yourIoTHUB>",
         "consumerGroup": "<yourGroup>",
         "cardinality": "many"
        }
      ],
     "disabled": false
  }

project.json:

{
  "frameworks": {
    "net46":{
      "dependencies": {
        "Microsoft.Azure.EventGrid": "1.1.0-preview"
      }
    }
   }
}

Finally, the following screen snippet shows an event grid event message received by AF subscriber for Device1:

这篇关于仅从单个设备处理Azure IoT中心事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆