使用先前的序列号或时间戳运行时,从Kinesis读取将提供空记录 [英] Read from Kinesis is giving empty records when run using previous sequence number or timestamp

查看:114
本文介绍了使用先前的序列号或时间戳运行时,从Kinesis读取将提供空记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试借助

get_records()和get_shard_iterator()API.

get_records() and get_shard_iterator() APIs.

我的生产者在处理结束时会不断推送记录,而消费者也每30分钟就会运行一次cron.因此,我尝试将当前读取的消息的序列号存储在数据库中,并使用AFTER_SEQUENCE_NUMBER分片迭代器以及最后读取的序列号.但是,在推送新消息后,第二次相同(第一次成功读取流中的所有消息)将不再起作用.

My producer keeps pushing the records when processed at it's end and consumer also keeps running as a cron every 30 minutes. So, I tried storing the sequence number of the current message read in my database and use AFTER_SEQUENCE_NUMBER shard iterator along with the sequence number last read. However, the same won't work for the second time (first time successfully read all messages in the stream) after new messages are pushed.

我还尝试使用AT_TIMESTAMP以及消息时间戳,生产者将消息时间戳推送为流的一部分,并存储该消息以供进一步使用.同样,第一次运行处理所有消息,第二次运行得到空记录.

I also tried using AT_TIMESTAMP along with message timestamp that producer pushed to stream as part of the message and stored that message to be further used. Again, first run processes all messages and from the second run I get empty records.

我真的不确定我要去哪里.如果有人可以帮助我,我将不胜感激.

I am really not sure where I am going wrong. I would appreciate if someone can help me in this.

使用时间戳提供以下代码,但是序列号方法也完成了同样的事情.

Providing the code below using timestamp but the same thing is done for sequence number method too.

def listen_to_kinesis_stream():
kinesis_client = boto3.client('kinesis', region_name=SETTINGS['region_name'])
stream_response = kinesis_client.describe_stream(StreamName=SETTINGS['kinesis_stream'])

for shard_info in stream_response['StreamDescription']['Shards']:
    kinesis_stream_status = mongo_coll.find_one({'_id': "DOC_ID"})
    last_read_ts = kinesis_stream_status.get('state', {}).get(
        shard_info['ShardId'], datetime.datetime.strftime(datetime.date(1970, 01, 01), "%Y-%m-%dT%H:%M:%S.%f"))

    shard_iterator = kinesis_client.get_shard_iterator(
        StreamName=SETTINGS['kinesis_stream'],
        ShardId=shard_info['ShardId'],
        ShardIteratorType='AT_TIMESTAMP',
        Timestamp=last_read_ts)

    get_response = kinesis_client.get_records(ShardIterator=shard_iterator['ShardIterator'], Limit=1)
    if len(get_response['Records']) == 0:
        continue

    message = json.loads(get_response['Records'][0]['Data'])
    process_resp = process_message(message)
    if process_resp['success'] is False:
        print process_resp
    generic_config_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
    print "Processed {0}".format(message)

    while 'NextShardIterator' in get_response:
        get_response = kinesis_client.get_records(ShardIterator=get_response['NextShardIterator'], Limit=1)
        if len(get_response['Records']) == 0:
            break

        message = json.loads(get_response['Records'][0]['Data'])
        process_resp = process_message(message)
        if process_resp['success'] is False:
            print process_resp
        mongo_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
        print "Processed {0}".format(message)

logger.debug("Processed all messages from Kinesis stream")
print "Processed all messages from Kinesis stream"

推荐答案

根据我与AWS技术支持人员的讨论,可能会有几条记录为空的消息,因此在len(get_response ['Records'])== 0.

As per my discussion with AWS technical support person, there can be a few messages with empty records and hence it is not a good idea to break when len(get_response['Records']) == 0.

建议的更好的方法是-我们可以拥有一个计数器,该计数器指示您在读取了尽可能多的消息后在运行和退出循环中读取的最大消息数.

The better approach suggested was - we can have a counter indicating maximum number of messages that you read in a run and exit loop after reading as many messages.

这篇关于使用先前的序列号或时间戳运行时,从Kinesis读取将提供空记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆