通过cql流分析迭代json消息中的嵌套列表 [英] iterate nested list in json msg by cql stream analytics

查看:72
本文介绍了通过cql流分析迭代json消息中的嵌套列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个来自iotHub的json消息,例如:

I have a json msg coming from iotHub like:

{
    "deviceId": "abc",
    "topic": "data",
    "data": {
        "varname1": [{
            "t": "timestamp1",
            "v": "value1",
            "f": "respondFrame1"
        },
        {
            "t": "timestamp2",
            "v": "value2",
            "f": "respondFrame2"
        }],
        "varname2": [{
            "t": "timestamp1",
            "v": "value1",
            "f": "respondFrame1"
        },
        {
            "t": "timestamp2",
            "v": "value2",
            "f": "respondFrame2"
        }]
    }
}

并希望通过天蓝色流分析作业将其存储到这样的事务sql中:

and want to store this by azure stream analytics job into a transact sql like this:

ID   |   deviceId |  varname  |  timestamp  |  respondFrame  | value
-----+------------+-----------+-------------+----------------+--------
1    |   abc      |  varname1 |  timestamp1 |  respondFrame1 | value1
2    |   abc      |  varname1 |  timestamp2 |  respondFrame2 | value2
3    |   abc      |  varname2 |  timestamp1 |  respondFrame1 | value1
4    |   abc      |  varname2 |  timestamp2 |  respondFrame2 | value2

anaybody是否知道如何处理这些堆叠的迭代并将其组合(交叉应用)?

does anaybody knwo how to handle this stacked iterations and combine it (cross apply)?

类似"phantomCode"的东西:

something like this "phantomCode":

deviceId = msg.deviceId
for d in msg.data:
    for key in d:
        varname = key.name
        timestamp = key[varname].t
        frame = key[varname].f
        value = key[varname].v

关于JS Azure答案的

更新:

UPDATE regarding to JS Azure answer:

带有代码

WITH datalist AS
(
    SELECT   
        iotHubAlias.deviceId,  
        data.PropertyName as varname,  
        data.PropertyValue as arrayData 
    FROM [iotHub] as iotHubAlias  
    CROSS APPLY GetRecordProperties(iotHubAlias.data) AS data
    WHERE iotHubAlias.topic = 'data'
)
SELECT
    datalist.deviceId,
    datalist.varname,
    arrayElement.ArrayValue.t as [timestamp],
    arrayElement.ArrayValue.f as respondFrame,
    arrayElement.ArrayValue.v as value
INTO [temporary]
FROM datalist 
CROSS APPLY GetArrayElements(datalist.arrayData) AS arrayElement

我总是得到一个错误:

{
    "channels": "Operation",
    "correlationId": "f9d4437b-707e-4892-a37b-8ad721eb1bb2",
    "description": "",
    "eventDataId": "ef5a5f2b-8c2f-49c2-91f0-16213aaa959d",
    "eventName": {
        "value": "streamingNode0",
        "localizedValue": "streamingNode0"
    },
    "category": {
        "value": "Administrative",
        "localizedValue": "Administrative"
    },
    "eventTimestamp": "2018-08-21T18:23:39.1804989Z",
    "id": "/subscriptions/46cd2f8f-b46b-4428-8f7b-c7d942ff745d/resourceGroups/fieldtest/providers/Microsoft.StreamAnalytics/streamingjobs/streamAnalytics4fieldtest/events/ef5a5f2b-8c2f-49c2-91f0-16213aaa959d/ticks/636704726191804989",
    "level": "Error",
    "operationId": "7a38a957-1a51-4da1-a679-eae1c7e3a65b",
    "operationName": {
        "value": "Process Events: Processing events Runtime Error",
        "localizedValue": "Process Events: Processing events Runtime Error"
    },
    "resourceGroupName": "fieldtest",
    "resourceProviderName": {
        "value": "Microsoft.StreamAnalytics",
        "localizedValue": "Microsoft.StreamAnalytics"
    },
    "resourceType": {
        "value": "Microsoft.StreamAnalytics/streamingjobs",
        "localizedValue": "Microsoft.StreamAnalytics/streamingjobs"
    },
    "resourceId": "/subscriptions/46cd2f8f-b46b-4428-8f7b-c7d942ff745d/resourceGroups/fieldtest/providers/Microsoft.StreamAnalytics/streamingjobs/streamAnalytics4fieldtest",
    "status": {
        "value": "Failed",
        "localizedValue": "Failed"
    },
    "subStatus": {
        "value": "",
        "localizedValue": ""
    },
    "submissionTimestamp": "2018-08-21T18:24:34.0981187Z",
    "subscriptionId": "46cd2f8f-b46b-4428-8f7b-c7d942ff745d",
    "properties": {
        "Message Time": "2018-08-21 18:23:39Z",
        "Error": "- Unable to cast object of type 'Microsoft.EventProcessing.RuntimeTypes.ValueArray' to type 'Microsoft.EventProcessing.RuntimeTypes.IRecord'.\r\n",
        "Message": "Runtime exception occurred while processing events, - Unable to cast object of type 'Microsoft.EventProcessing.RuntimeTypes.ValueArray' to type 'Microsoft.EventProcessing.RuntimeTypes.IRecord'.\r\n, : OutputSourceAlias:temporary;",
        "Type": "SqlRuntimeError",
        "Correlation ID": "f9d4437b-707e-4892-a37b-8ad721eb1bb2"
    },
    "relatedEvents": []
}

这是来自设备的真实json消息的示例:

and here an example of a real json msg coming from a device:

{
    "topic": "data",
    "data": {
        "ExternalFlowTemperatureSensor": [{
            "t": "2018-08-22T11:00:11.955381",
            "v": 16.64103,
            "f": "Q6ES8KJIN1NX2DRGH36RX1WDT"
        }],
        "AdaStartsP2": [{
            "t": "2018-08-22T11:00:12.863383",
            "v": 382.363138,
            "f": "9IY7B4DFBAMOLH3GNKRUNUQNUX"
        },
        {
            "t": "2018-08-22T11:00:54.172501",
            "v": 104.0,
            "f": "IUJMP20CYQK60B"
        }],
        "s_DriftData[4].c32_ZeitLetzterTest": [{
            "t": "2018-08-22T11:01:01.829568",
            "v": 348.2916,
            "f": "MMTPWQVLL02CA"
        }]
    },
    "deviceId": "test_3c27db"
}

(并完成该操作)以创建sql表的代码:

and (to have it complete) the creation code for the sql table:

create table temporary (
    id int NOT NULL IDENTITY PRIMARY KEY,
    deviceId nvarchar(20) NOT NULL,
    timestamp datetime NOT NULL,
    varname nvarchar(100) NOT NULL,
    value float,
    respondFrame nvarchar(50)
    )

推荐答案

以下查询将为您提供预期的输出

the following query will give you the expected output

WITH step1 AS
(
SELECT   
    event.deviceID,  
    data.PropertyName as varname,  
    data.PropertyValue as arrayData 
FROM blobtest as event  
CROSS APPLY GetRecordProperties(event.data) AS data  
)
SELECT
    event.deviceId,
    event.varname,
    arrayElement.ArrayValue.t  as [timestamp],
    arrayElement.ArrayValue.f as frame,
    arrayElement.ArrayValue.v as value
FROM step1 as event  
CROSS APPLY GetArrayElements(event.arrayData) AS arrayElement  

您可以在我们的文档页面"在Azure流分析中解析JSON和Avro数据"

You can find more info about JSON parsing on our documentation page "Parse JSON and Avro data in Azure Stream Analytics"

让我知道您是否还有其他问题.

Let me know if you have any other question.

JS(Azure流分析)

JS (Azure Stream Analytics)

这篇关于通过cql流分析迭代json消息中的嵌套列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆