读取由 Amazon Kinesis Firehose 流写入 s3 的数据 [英] Reading the data written to s3 by Amazon Kinesis Firehose stream

查看:41
本文介绍了读取由 Amazon Kinesis Firehose 流写入 s3 的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在将记录写入 Kinesis Firehose 流,最终由 Amazon Kinesis Firehose 写入 S3 文件.

我的记录对象看起来像

ItemPurchase {字符串personId,字符串 itemId}

写入 S3 的数据如下所示:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

没有逗号分隔.

没有像在 Json 数组中那样的起始括号

<预><代码>[

在 Json 数组中没有结束括号

<预><代码>]

我想读取此数据以获取 ItemPurchase 对象的列表.

List购买 = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

读取这些数据的正确方法是什么?

解决方案

Amazon Firehose 以这种方式将 JSON 消息转储到 S3,并且不允许您设置分隔符或任何内容,这让我感到难以置信.

最终,我发现解决问题的技巧是使用 JSON raw_decode 方法处理文本文件

这将允许您读取一堆串联的 JSON 记录,而它们之间没有任何分隔符.

Python 代码:

导入json解码器 = json.JSONDecoder()使用 open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') 作为 content_file:内容 = content_file.read()content_length = len(内容)解码索引 = 0而 decode_index <内容长度:尝试:obj, decode_index =decoder.raw_decode(content, decode_index)打印(文件索引:",decode_index)打印(对象)除了 JSONDecodeError 作为 e:print("JSONDecodeError:", e)# 向前扫描并继续尝试解码解码索引 += 1

I am writing record to Kinesis Firehose stream that is eventually written to a S3 file by Amazon Kinesis Firehose.

My record object looks like

ItemPurchase {
    String personId,
    String itemId
}

The data is written to S3 looks like:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

NO COMMA SEPERATION.

NO STARTING BRACKET as in a Json Array

[

NO ENDING BRACKET as in a Json Array

]

I want to read this data get a list of ItemPurchase objects.

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

What is the correct way to read this data?

解决方案

It boggles my mind that Amazon Firehose dumps JSON messages to S3 in this manner, and doesn't allow you to set a delimiter or anything.

Ultimately, the trick I found to deal with the problem was to process the text file using the JSON raw_decode method

This will allow you to read a bunch of concatenated JSON records without any delimiters between them.

Python code:

import json

decoder = json.JSONDecoder()

with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:

    content = content_file.read()

    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(content, decode_index)
            print("File index:", decode_index)
            print(obj)
        except JSONDecodeError as e:
            print("JSONDecodeError:", e)
            # Scan forward and keep trying to decode
            decode_index += 1

这篇关于读取由 Amazon Kinesis Firehose 流写入 s3 的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆