读取由 Amazon Kinesis Firehose 流写入 s3 的数据 [英] Reading the data written to s3 by Amazon Kinesis Firehose stream
问题描述
我正在将记录写入 Kinesis Firehose 流,最终由 Amazon Kinesis Firehose 写入 S3 文件.
我的记录对象看起来像
ItemPurchase {字符串personId,字符串 itemId}
写入 S3 的数据如下所示:
{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}
没有逗号分隔.
没有像在 Json 数组中那样的起始括号
<预><代码>[在 Json 数组中没有结束括号
<预><代码>]我想读取此数据以获取 ItemPurchase 对象的列表.
List购买 = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))
读取这些数据的正确方法是什么?
Amazon Firehose 以这种方式将 JSON 消息转储到 S3,并且不允许您设置分隔符或任何内容,这让我感到难以置信.
最终,我发现解决问题的技巧是使用 JSON raw_decode 方法处理文本文件
这将允许您读取一堆串联的 JSON 记录,而它们之间没有任何分隔符.
Python 代码:
导入json解码器 = json.JSONDecoder()使用 open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') 作为 content_file:内容 = content_file.read()content_length = len(内容)解码索引 = 0而 decode_index <内容长度:尝试:obj, decode_index =decoder.raw_decode(content, decode_index)打印(文件索引:",decode_index)打印(对象)除了 JSONDecodeError 作为 e:print("JSONDecodeError:", e)# 向前扫描并继续尝试解码解码索引 += 1
I am writing record to Kinesis Firehose stream that is eventually written to a S3 file by Amazon Kinesis Firehose.
My record object looks like
ItemPurchase {
String personId,
String itemId
}
The data is written to S3 looks like:
{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}
NO COMMA SEPERATION.
NO STARTING BRACKET as in a Json Array
[
NO ENDING BRACKET as in a Json Array
]
I want to read this data get a list of ItemPurchase objects.
List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))
What is the correct way to read this data?
It boggles my mind that Amazon Firehose dumps JSON messages to S3 in this manner, and doesn't allow you to set a delimiter or anything.
Ultimately, the trick I found to deal with the problem was to process the text file using the JSON raw_decode method
This will allow you to read a bunch of concatenated JSON records without any delimiters between them.
Python code:
import json
decoder = json.JSONDecoder()
with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:
content = content_file.read()
content_length = len(content)
decode_index = 0
while decode_index < content_length:
try:
obj, decode_index = decoder.raw_decode(content, decode_index)
print("File index:", decode_index)
print(obj)
except JSONDecodeError as e:
print("JSONDecodeError:", e)
# Scan forward and keep trying to decode
decode_index += 1
这篇关于读取由 Amazon Kinesis Firehose 流写入 s3 的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!