从Azure Data Lake Store中的EventHub捕获分块的HTTP流数据 [英] Capture chunked http stream data from EventHub in Azure Data Lake Store

查看:82
本文介绍了从Azure Data Lake Store中的EventHub捕获分块的HTTP流数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Python使用流API,并尝试使用Azure Data Lake Store将其发送到具有捕获设置的EventHub.

我可以在这里为MSDN中建议的演示进行设置:

https://docs.microsoft.com/zh-CN/azure/event-hubs/event-hubs-python-get-started-send

我看到这一直在创建一个508字节的文件

看起来像这样

但是它确实包含另一个具有实际数据(虚拟100条消息)的文件

我能够使用以下代码查询该avro文件中的数据(将其下载进行测试后):

所以,一切都很好.

但是,当我尝试将分块的数据发送到它-AioHttp.StreamReader类的对象时,它不会失败,但是当我去检查Data Lake Store时,我看到了一大堆数据文件夹,但是其中每个文件夹仅具有一个单个的"avro".文件 (508字节的文件),与我的第一个屏幕截图相同.

我了解avro文件将同时包含架构和数据,但是为什么即使我没有将任何数据发送到事件中心(即使在演示示例中),也仍会在单独的文件夹中继续创建508字节文件(来自MSDN).然后我把 数据在Event Hub中捕获并停止.

以及如何发送分块数据,以便在ADLS中正确捕获它.请注意,程序不会抱怨任何错误.

这是我正在使用的示例代码片段

在呼叫者中:

解决方案

好,我知道了.

我收到的传入流是一个json.因此,似乎我只需要专门将完整格式的json发送到Event Hub,然后将数据捕获发送到Azure Data Lake Store就可以了.当然,这会减慢将流发送到事件中心的速度.所以这 是我要做的:

异步定义抓取(会话,网址,标头,发件人):
    使用async_timeout.timeout(None):
        与session.get(init_res.headers ['Location'],headers = headers,proxy ="http://127.0.0.1:8888",allow_redirects = False,timeout = None)异步:
            buff ="
            而True:
                块=等待r.content.read(1024 * 3)
                如果不是大块的:
                    休息
                别的:
                    chunk = chunk.decode("utf-8")
                    ind = chunk.find("\ r \ n")
                    如果ind ==-1:
                        buff = buff +块
                    别的:
                        buff = buff +块[0:ind]
                        打印(浅黄色)
                        sender.send(EventData(buff))
                        buff ="
                        buff = buff +块[ind + 2:]

异步def main(url,headers,sender):
    与aiohttp.ClientSession()作为会话异步:
        html =等待获取(会话,URL,标题,发件人)

因此,在这里,我要检查CRLF(即确定来自API的一条消息的方式),然后将其发送到事件中心.我想知道,为什么Data Lake无法从EventHub中获取数据.它真的必须具有格式正确的json吗?它不应该像 在这种情况下只是一个转储?

无论如何,现在都可以使用上面的代码(虽然速度较慢),但是总比没有好.

如果有人知道更好的方法,那么我就是耳目了.



Hi,

I am consuming a streaming API using Python and trying to send it to an EventHub which has a capture setup using Azure Data Lake Store.

I am able to do the setup for the demo suggested in MSDN in here : 

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-python-get-started-send

This I see keeps creating a 508 bytes file



which looks like this

But it did contain another file which had the actual data (the dummy 100 messages)

And I was able to query the data in that avro file (after downloading it for a test) using the code:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

reader = DataFileReader(open("C:\\Users\\abc\\Downloads\\17.avro", "rb"), DatumReader())
for user in reader:
   print (user)
reader.close()

So, all fine there.

However when I try to send chunked data to it- object of AioHttp.StreamReader class, it does not fail but when I go and check the Data Lake Store, I see a whole bunch of data folders but each one of those folders has only one single "avro" file (the 508 bytes file) , the same as in my first screenshot.

I understand that the avro file will contain both the schema and the data, but why does it keep creating that 508 bytes file in separate folders, even when I am not sending any data to the event hub (even in the demo sample from MSDN). I then turned the data capture off in Event hub and it stopped.

And how do I send chunked data , so that it is correctly captured in ADLS. Mind you the program does not complain about any error.

Here is the sample code snippet that I am using

async def fetch(session, url, headers,sender):
    with async_timeout.timeout(None):
        async with session.get(init_res.headers['Location'], headers=headers, proxy="http://127.0.0.1:8888", allow_redirects=False,timeout=None) as r:
            while True:
                chunk=await r.content.read(1024*3)
                if not chunk:
                    break                    
                sender.send(EventData(chunk))

async def main(url, headers,sender):
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, url,headers,sender)

In the caller:

client = EventHubClient(EventHubAddress, debug=True,username=USER, password=KEY)

sender = client.add_sender(partition=None)
client.run()
......
......
loop = asyncio.get_event_loop()
        loop.run_until_complete(main(loc, headers=headers,sender=sender))

How do I make ADLS get this chunked data?


Update: I have also tried receiving data directly from the Event hub in Python and it does show me the data that I pushed. So, why is the capture to ADLS not working.

解决方案

Ok, I figured it out. 

The incoming stream I had was a json. So, it seems I had to specifically send only fully formed json (s) to Event Hub and then the Data Capture to Azure Data Lake Store worked. This of course slows down the sending of the stream to the event hub. So, this is what I had to do:

async def fetch(session, url, headers,sender):
    with async_timeout.timeout(None):
        async with session.get(init_res.headers['Location'], headers=headers, proxy="http://127.0.0.1:8888", allow_redirects=False,timeout=None) as r:
            buff=""
            while True:
                chunk=await r.content.read(1024*3)
                if not chunk:
                    break                     
                else:
                    chunk=chunk.decode("utf-8")
                    ind=chunk.find("\r\n")
                    if ind==-1:
                        buff=buff + chunk
                    else:
                        buff= buff + chunk[0:ind]
                        print(buff)
                        sender.send(EventData(buff))
                        buff=""
                        buff= buff + chunk[ind+2:]

async def main(url, headers,sender):
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, url,headers,sender)

So, here I am checking for CRLF (that's how one message from the API is determined) and then send it to Event Hub. I am wondering, why can't Data Lake store pick up data from EventHub. Does it really have to have a well formed json? Should it not act like just a dump in this case? 

Anyways, works now with the code above (though slower), but something is better than nothing.

If someone knows a better way to do this , I am all ears and eyes.



这篇关于从Azure Data Lake Store中的EventHub捕获分块的HTTP流数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆