Eventhub读取数据作为python中的列表 [英] Eventhub Reading data as list in python
问题描述
我想将Azure Eventhub用作中间件消息传递队列.我基本上是以列表格式发送模拟数据,现在以字符串格式接收它.
I want to use Azure Eventhub as a middleware messaging queue. I am basically sending simulated data in list formats and receiving it in string format now.
As you can see Here, there are only a few formats in which data is convertible. I want the format of data to be a list with float data in it.
这是我现在正在处理的代码.我正在尝试对下面的行进行处理,以浮动形式累积在列表中的每个事件数据.
Here is the code that I am working on right now. I am trying to manipulate the line below to each event data in float form being accumulated in the list.
LIST.append(event_data.message._body)
这是我的代码的主体.
This is the body of my code.
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"
total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
i=1
LIST=[]
try:
receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
client.run()
start_time = time.time()
batch = receiver.receive(timeout=None)
while batch:
for event_data in batch[-100:]:
last_offset = event_data.offset
last_sn = event_data.sequence_number
print("Received: {}, {}".format(i, last_sn))
LIST.append(event_data.message._body)
i += 1
total += 1
batch = receiver.receive(timeout=5000)
end_time = time.time()
client.stop()
run_time = end_time - start_time
print("Received {} messages in {} seconds".format(total, run_time))
except KeyboardInterrupt:
pass
finally:
client.stop()
您可以在 ================================更新============= =====================
=================================UPDATE ===================================
结果是,它显示消息[a b c ....]",我认为该消息已设置为要写入,因此我想在结果格式中删除消息"一词.
As a result, it shows 'Message [a b c ....]', and I think the Message was set to be written, so I want to remove the word 'Message' in the result format.
"sender.py"如下:
The "sender.py" is following:
from azure.eventhub import EventHubClient, Sender, EventData
import time
import logging
import numpy as np
logger = logging.getLogger("azure")
ADDRESS = ""
USER = "RootManageSharedAccessKey"
KEY = ""
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
# Create Event Hubs client
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
sender = client.add_sender(partition="0")
client.run()
forging2 = lambda x: (np.exp(-(0.1*x-6)**2+3) + np.exp(-(0.1*x-4)**2+4))*1.4
x_value = np.arange(100)
try:
start_time = time.time()
for i in range(100):
y_value1 = forging2(x_value) + np.random.normal(0,1,len(x_value))*3
y_value1 = np.asarray(y_value1)
print("Sending message: {}, {}".format(i, y_value1))
message = y_value1
sender.send(EventData(message))
time.sleep(0.35)
except:
raise
finally:
end_time = time.time()
client.stop()
run_time = end_time - start_time
logger.info("Runtime: {} seconds".format(run_time))
except KeyboardInterrupt:
pass
推荐答案
以下固定代码有效:
logger = logging.getLogger("azure")
ADDRESS = ""
USER = "RootManageSharedAccessKey"
KEY = ""
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
# Create Event Hubs client
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
sender = client.add_sender(partition="0")
client.run()
forging2 = lambda x: (np.exp(-(0.1*x-6)**2+3) + np.exp(-(0.1*x-4)**2+4))*1.4
x_value = np.arange(100)
try:
start_time = time.time()
for i in range(100000):
y_value1 = forging1(x_value) + np.random.normal(0,1,len(x_value))*3
y_value1 = np.asarray(y_value1)
print("Sending message: {}, {}".format(i, y_value1))
message = "{}".format(y_value1)
sender.send(EventData(message))
time.sleep(0.35)
except:
raise
finally:
end_time = time.time()
client.stop()
run_time = end_time - start_time
logger.info("Runtime: {} seconds".format(run_time))
except KeyboardInterrupt:
pass
通过这种方式,我能够接收到其中没有消息"的消息.
In this way, I was able to receive the messages without "messages" in it.
这篇关于Eventhub读取数据作为python中的列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!