使用REST API时,选择Azure的事件枢纽分区 [英] Selecting Partition of Azure event hub when using rest api

查看:303
本文介绍了使用REST API时,选择Azure的事件枢纽分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图使用python和REST API将消息发送到Azure的活动中心
经过一些失败的实验我已经找到工作code(见下文),但我希望能够选择到哪个分区发送的事件。

这是可能使用REST API,如果是这样应该怎么做呢?

 #!/用户/斌/蟒蛇
进口JSON
从日期时间日期时间进口
从多进口游泳池
#从azure.servicebus进口_service_bus_error_handler从azure.servicebus.servicebusservice进口ServiceBusService,ServiceBusSASAuthentication从azure.http进口(
    HTT prequest,
    HTTPError这样的
)从azure.http.httpclient进口_HTTPClientEVENT_HUB_HOST =mysecrethub.servicebus.windows.net
EVENT_HUB_NAME =secerthub名
KEYNAME =senderkey#需要从ENV加载
KEYVALUE =键值#需要从ENV加载
EXTRA_HEADERS = []
NUM_OF_PARTITIONS = 16
类EventHubClient(对象):    高清__init __(自我,主机,hubname,键名,键值):
        self._host =主机
        self._hub = hubname
        self._keyname =键名
        self._key =键值    高清的sendMessage(自我,身体,分区=无,additional_headers =无):
        eventHubHost = self._host        HttpClient的= _HTTPClient(service_instance =个体经营)        sasKeyName = self._keyname
        sasKeyValue = self._key        验证= ServiceBusSASAuthentication(sasKeyName,sasKeyValue)        要求= HTT prequest()
        request.method =POST
        request.host = eventHubHost
        request.protocol_override =htt​​ps开头
        Request的=/%S /消息?API版本= 2014-01%(self._hub)
        request.body =体
        request.headers.append(('内容类型','应用程序/原子+ XML;类型=项;字符集= utf-8'))
        如果additional_headers不是无:
            在additional_headers项目:
                request.headers.append(项目)
        如果分区不是无:
            值= json.dumps({'PartitionKey:分区})
            request.headers.append(('BrokerProperties',值))
        authentication.sign_request(请求的HttpClient)        request.headers.append((内容长度,STR(LEN(request.body))))        状态= 0        尝试:
            RESP = httpclient.perform_request(要求)
            状态= resp.status
        除了HTTPError这样的例如:
            状态= ex.status
        #打印request.headers
        返回状态
DEF prepare_message(APPID,会话ID,partitionKey =无,SessionEllapsed =无,DeviceOs =无):
    消息= {姓名:MonitorEvent}
    属性= {的AppId:APPIDSessionStarted:。加入(STR(datetime.now())[: - 3])}
    如果SessionEllapsed不是无:
        属性['SessionEllapsed'] = SessionEllapsed
    如果DeviceOs不是无:
        属性['DeviceOs'] = DeviceOs
    如果partitionKey不是无:
        消息[PartitionKey] = STR(partitionKey)
        消息[的partitionid] = STR(partitionKey)
        属性['ITEMID'] = partitionKey
    消息['属性'] =属性
    返回json.dumps(消息)
高清send_mo​​nitoring_event(分区):
    hubClient = EventHubClient(EVENT_HUB_HOST,EVENT_HUB_NAME,KEYNAME,KEYVALUE)
    的appid = 1
    sendertime = datetime.now()的strftime('%Y%M%D-%H%M%S')
    消息= prepare_message(APPID,sendertime,partitionKey =分区,SessionEllapsed = 1,DeviceOs ='监视'+ STR(分区))
    #打印消息
    hubStatus = hubClient.sendMessage(消息,分区=无,additional_headers = EXTRA_HEADERS)
#返回HTTP状态给调用者
    返回hubStatus
高清的main():
    池=池(进程= NUM​​_OF_PARTITIONS)    打印pool.map(send_mo​​nitoring_event,范围(NUM_OF_PARTITIONS))如果__name__ =='__main__':
    主要()


解决方案

继节发送事件事件枢纽REST API的docunment的 https://msdn.microsoft.com/en-us/library/azure/dn790664.aspx ,则不能使用请求URI的https:/ /{serviceNamespace}.servicebus.windows.net/{eventHubPath}/messages选择到哪个分区发送事件。

您应该使用请求URI的https:// {} serviceNamespace .servicebus.windows.net / {} eventHubPath /出版/ {}的DeviceID /消息。属性{}的DeviceID是分区键有什么用组/分区的设备,无论是地理位置,设备类型,版本,租客,等等。

但分割数必须是数字2和32之间。因此,如果你需要使用超过32个分区,我建议把钥匙插进事件数据。

最好的问候。

I'm trying to send messages to Azure Event Hub using python and the rest API after some failed experiments i have found working code (see below) but i want to be able to select to which partition to send the event.

Is this possible using the rest API and if so how should be done?

#!/user/bin/python
import json
from datetime import datetime
from multiprocessing import Pool
# from azure.servicebus import _service_bus_error_handler

from azure.servicebus.servicebusservice import ServiceBusService, ServiceBusSASAuthentication

from azure.http import (
    HTTPRequest,
    HTTPError
)

from azure.http.httpclient import _HTTPClient

EVENT_HUB_HOST = "mysecrethub.servicebus.windows.net"
EVENT_HUB_NAME = "secerthub-name"
KEYNAME = "senderkey"  # needs to be loaded from ENV
KEYVALUE = "keyvalue"  # needs to be loaded from ENV
EXTRA_HEADERS = []
NUM_OF_PARTITIONS = 16


class EventHubClient(object):

    def __init__(self, host, hubname, keyname, keyvalue):
        self._host = host
        self._hub = hubname
        self._keyname = keyname
        self._key = keyvalue

    def sendMessage(self, body, partition=None, additional_headers=None):
        eventHubHost = self._host

        httpclient = _HTTPClient(service_instance=self)

        sasKeyName = self._keyname
        sasKeyValue = self._key

        authentication = ServiceBusSASAuthentication(sasKeyName, sasKeyValue)

        request = HTTPRequest()
        request.method = "POST"
        request.host = eventHubHost
        request.protocol_override = "https"
        request.path = "/%s/messages?api-version=2014-01" % (self._hub)
        request.body = body
        request.headers.append(('Content-Type', 'application/atom+xml;type=entry;charset=utf-8'))
        if additional_headers is not None:
            for item in additional_headers:
                request.headers.append(item)
        if partition is not None:
            value = json.dumps({'PartitionKey': partition})
            request.headers.append(('BrokerProperties', value))
        authentication.sign_request(request, httpclient)

        request.headers.append(('Content-Length', str(len(request.body))))

        status = 0

        try:
            resp = httpclient.perform_request(request)
            status = resp.status
        except HTTPError as ex:
            status = ex.status
        # print request.headers
        return status


def prepare_message(appid, sessionid, partitionKey=None, SessionEllapsed=None, DeviceOs=None):
    message = {"Name": "MonitorEvent"}
    Attributes = {"AppId": appid, "SessionStarted": "".join(str(datetime.now())[:-3])}
    if SessionEllapsed is not None:
        Attributes['SessionEllapsed'] = SessionEllapsed
    if DeviceOs is not None:
        Attributes['DeviceOs'] = DeviceOs
    if partitionKey is not None:
        message["PartitionKey"] = str(partitionKey)
        message["PartitionId"] = str(partitionKey)
        Attributes['ItemId'] = partitionKey
    message['Attributes'] = Attributes
    return json.dumps(message)


def send_monitoring_event(partition):
    hubClient = EventHubClient(EVENT_HUB_HOST, EVENT_HUB_NAME, KEYNAME, KEYVALUE)
    appid = 1
    sendertime = datetime.now().strftime('%Y%M%d-%H%M%S')
    message = prepare_message(appid, sendertime, partitionKey=partition, SessionEllapsed=1, DeviceOs='Monitor' + str(partition))
    # print message
    hubStatus = hubClient.sendMessage(message, partition=None, additional_headers=EXTRA_HEADERS)
# return the HTTP status to the caller
    return hubStatus


def main():
    pool = Pool(processes=NUM_OF_PARTITIONS)

    print pool.map(send_monitoring_event, range(NUM_OF_PARTITIONS))

if __name__ == '__main__':
    main()

解决方案

Following the section 'Send Event' of Event Hubs REST APIs docunment https://msdn.microsoft.com/en-us/library/azure/dn790664.aspx, you can't use the Request URI https://{serviceNamespace}.servicebus.windows.net/{eventHubPath}/messages to select to which partition to send events.

You should use the Request URI https://{serviceNamespace}.servicebus.windows.net/{eventHubPath}/publishers/{deviceId}/messages. The attribute {deviceId} is partition key what used to group/partition devices—whether it is geo-location, device type, version, tenant, and so on.

But the partition count must be a number between 2 and 32. So if you need to use more than 32 partitions, I suggest to put the key into the event data.

Best Regards.

这篇关于使用REST API时,选择Azure的事件枢纽分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆