如何仅在事件中心接收最新数据 [英] How to receive the recent data only in event hub

查看:71
本文介绍了如何仅在事件中心接收最新数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在eventhub中,我同时具有发送者"和接收者"脚本,用于这两个脚本之间的通信.

In eventhub, I have both "sender" and "receiver" scripts for communication between those two.

我面临的问题是,似乎我收到的是昨天发送的数据集以及刚刚发送的数据集.我正在尝试通过时间段或事件数来控制数据量.

The issue that I am facing is that it seems that I am receiving a dataset that I sent yesterday plus the one that I just sent together. I am trying to control the data amount by either time period or the number of events.

sender.py的基本代码如下:

The basic code for sender.py is following:


CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(
        CONSUMER_GROUP, PARTITION, prefetch=0, offset=OFFSET)
    client.run()
    start_time = time.time()
    batch = receiver.receive(timeout=100)

    for event_data in batch[-10:]:
        print("Received: {}".format(event_data.body_as_str(encoding='UTF-8')))
        total += 1

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass
finally:
    client.stop()

推荐答案

我刚刚找到了一种使用偏移量控制事件数据读取过程的解决方案.

I just found a solution which uses the offset to control the read process of event data.

我们首先需要获取事件数据的偏移量.

What we need to do first is that get the offset of the event data.

下面的代码:

logger = logging.getLogger("azure")

ADDRESS = "amqps://xxx.servicebus.windows.net/xxx"
USER = "RootManageSharedAccessKey"
KEY = "xxx"

CONSUMER_GROUP = "$default"

#first, set offset to -1 to read all the event data
OFFSET = Offset("-1")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(
        CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    print("**begin receive**")
    for event_data in receiver.receive(timeout=100):
        last_offset = event_data.offset.value
        last_sn = event_data.sequence_number
        #here, we print out the offset of each event data
        print("Received: {}, last_offset: {}, last_sn: {}".format(event_data.body_as_str(encoding='UTF-8'),last_offset,last_sn))        
        total += 1

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass
finally:
    client.stop()

执行后,您可以看到每个数据的所有偏移量,如下图所示:

after executing, you can see all the offset of each data, screenshot like below:

然后,您知道每个事件数据的偏移量.并且,如果要从40号到53号获取数据.40号的偏移量是237080,因此在您的代码中,将偏移量更改为刚好小于237080的值,并在此代码行中将其设置为237079 .

then, you know the offset of each event data. And if you want to get the data from number 40 to number 53. The offset for number 40 is 237080, so in your code, change the offset to a value just less than 237080, set it to 237079 in this line of code OFFSET = Offset("237079").

下面的代码:

logger = logging.getLogger("azure")

ADDRESS = "amqps://xxx.servicebus.windows.net/xx"
USER = "RootManageSharedAccessKey"
KEY = "xxx"

CONSUMER_GROUP = "$default"

#set the offset
OFFSET = Offset("237079")
PARTITION = "0"

total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
    receiver = client.add_receiver(
        CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
    client.run()
    start_time = time.time()
    print("**begin receive**")
    for event_data in receiver.receive(timeout=100):
        last_offset = event_data.offset.value
        last_sn = event_data.sequence_number
        print("Received: {}, last_offset: {}, last_sn: {}".format(event_data.body_as_str(encoding='UTF-8'),last_offset,last_sn))        
        total += 1

    end_time = time.time()
    client.stop()
    run_time = end_time - start_time
    print("Received {} messages in {} seconds".format(total, run_time))

except KeyboardInterrupt:
    pass
finally:
    client.stop()

执行代码后,仅返回指定偏移量的事件数据.截图如下:

after execute the code, only the event data from the specified offset are returned. Screenshot as below:

这篇关于如何仅在事件中心接收最新数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆