适用于Python的Azure Eventhub库 [英] Azure eventhub library for python

查看:97
本文介绍了适用于Python的Azure Eventhub库的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用eventhub接收很多事件.我有多个使用者正在运行一个扩展组,该组从具有多个分区的eventhub中读取这些事件.我正在查看python中的Azure SDK,对使用什么感到困惑.有eventhubconsumerclient,eventprocessorHost ....

我想使用一个库,我的多个使用者可以使用使用者组进行连接,将分区动态分配给每个使用者,并在存储帐户中进行检查点设置,就像我使用kafka一样.

解决方案

更新:

对于生产用途,我建议您使用事件中心SDK的稳定版本.您可以使用eph,示例代码为

创建策略后,您可以按照以下屏幕截图复制连接字符串:

然后您可以按照以下代码进行操作:

  import os从azure.eventhub导入EventHubConsumerClient从azure.eventhub.extensions.checkpointstoreblob导入BlobCheckpointStoreCONNECTION_STR ='端点= sb://ivanehubns.servicebus.windows.net/; SharedAccessKeyName = saspolicy; SharedAccessKey = xxx; EntityPath = myeventhub'STORAGE_CONNECTION_STR ='DefaultEndpointsProtocol = https; AccountName = xx; AccountKey = xxx; EndpointSuffix = core.windows.net'def on_event(partition_context,事件):#为事件做点事打印(事件)打印(事件")partition_context.update_checkpoint(事件)如果__name__ =='__main__':#"a22"是Blob容器名称checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR,"a22")#"$ default"是使用者组客户端= EventHubConsumerClient.from_connection_string(CONNECTION_STR,"$ default",checkpoint_store = checkpoint_store)尝试:打印('确定')client.receive(on_event)除了KeyboardInterrupt:client.close() 

测试结果:

I am using eventhub for ingesting a lot of events. I have multiple consumers which are running behing a scaling group reading these events from the eventhub which has multiple partitions. I was going through the Azure SDK in python and was confused as to what to use. There is eventhubconsumerclient, eventprocessorHost ....

I would like to use a library where my multiple consumer can connect using the consumer group, the partitions are assigned dynamically to each consumer and checkpointing is made in the storage account, just like how I used kafka.

解决方案

Update:

For production usage, I suggest you should use the stable version of event hub sdk. You can use eph, sample code is here.


I can use the pre-release eventhub 5.0.0b6 to use consumer group as well as set checkpoint.

But the strange thing is that, in blob storage, I can see 2 folders created for the eventhub: checkpoint and ownership folder. Inside the folders, there're blob created for the partitions, but blob is empty. More stranger thing is that, even the blob is empty, every time I read from eventhub, it always read the latest data(means that it never reads the data has been read already in the same consumer group).

You need to install azure-eventhub 5.0.0b6 and use pip install --pre azure-eventhub-checkpointstoreblob to install azure-eventhub-checkpointstoreblob. For blob storage, you should install the latest version 12.1.0 of azure-storage-blob.

I follow this sample. In this sample, it uses event hub level connection string(NOT event hub namespace level connection string). You need to create an event hub level connection string by nav to azure portal -> your eventhub namespace -> your event hub instance -> Shared access policies -> click "Add" -> then specify a policy name, and select permission. If you just want to receive data, you can only select the Listen permission. The screenshot as below:

After the policy created, you can copy the connection string as per screenshot below:

Then you can follow this code below:

import os
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore

CONNECTION_STR = 'Endpoint=sb://ivanehubns.servicebus.windows.net/;SharedAccessKeyName=saspolicy;SharedAccessKey=xxx;EntityPath=myeventhub'
STORAGE_CONNECTION_STR = 'DefaultEndpointsProtocol=https;AccountName=xx;AccountKey=xxx;EndpointSuffix=core.windows.net'


def on_event(partition_context, event):
    # do something with event
    print(event)
    print('on event')
    partition_context.update_checkpoint(event)


if __name__ == '__main__':

    #the "a22" is the blob container name
    checkpoint_store = BlobCheckpointStore.from_connection_string(STORAGE_CONNECTION_STR, "a22")

    #the "$default" is the consumer group
    client = EventHubConsumerClient.from_connection_string(
        CONNECTION_STR, "$default", checkpoint_store=checkpoint_store)

    try:
        print('ok')
        client.receive(on_event)
    except KeyboardInterrupt:
        client.close()

The test result:

这篇关于适用于Python的Azure Eventhub库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆