流JSON解析器 [英] Streaming json parser
问题描述
我希望为无法加载到内存的非常大的JSON文件(〜1TB)实现流式json解析器。一种选择是使用类似 https://github.com/stedolan/jq 之类的文件来转换文件放入json-newline分隔,但是我需要对每个json对象做很多其他事情,这使这种方法不理想。
I am looking to implement a streaming json parser for a very, very large JSON file (~ 1TB) that I'm unable to load into memory. One option is to use something like https://github.com/stedolan/jq to convert the file into json-newline-delimited, but there are various other things I need to do to each json object, that makes this approach not ideal.
给出一个非常大的json对象,我将如何逐对象解析它,类似于xml中的这种方法: https://www.ibm.com/developerworks/library/x-hiperfparse/index.html 。
Given a very large json object, how would I be able to parse it object-by-object, similar to this approach in xml: https://www.ibm.com/developerworks/library/x-hiperfparse/index.html.
例如,伪代码中:
with open('file.json','r') as f:
json_str = ''
for line in f: # what if there are no newline in the json obj?
json_str += line
if is_valid(json_str):
obj = json.loads(json_str)
do_something()
json_str = ''
此外,我发现 jq -c
并不是特别快(忽略内存注意事项)。例如,执行 json.loads
与使用 jq -c
一样快(并且更快)。我也尝试使用 ujson
,但是一直出现损坏错误,我认为这与文件大小有关。
Additionally, I did not find jq -c
to be particularly fast (ignoring memory considerations). For example, doing json.loads
was just as fast (and a bit faster) than using jq -c
. I tried using ujson
as well, but kept getting a corruption error which I believe was related to the file size.
# file size is 2.2GB
>>> import json,time
>>> t0=time.time();_=json.loads(open('20190201_itunes.txt').read());print (time.time()-t0)
65.6147990227
$ time cat 20190206_itunes.txt|jq -c '.[]' > new.json
real 1m35.538s
user 1m25.109s
sys 0m15.205s
最后,这是一个示例100KB json输入,可用于测试: https:// hastebin.com/ecahufonet.json
Finally, here is an example 100KB json input which can be used for testing: https://hastebin.com/ecahufonet.json
推荐答案
如果文件包含一个大的JSON对象(数组或映射) ,然后按照JSON规范,必须先读取整个对象,然后才能访问其组件。
If the file contains one large JSON object (either array or map), then per the JSON spec, you must read the entire object before you can access its components.
例如,如果文件是对象为 [{...},{...}]
,然后以换行符分隔的JSON效率更高,因为您一次只需要在内存中保留一个对象,解析器只需读取一个即可行才能开始处理。
If for instance the file is an array with objects [ {...}, {...} ]
then newline delimited JSON is far more efficient since you only have to keep one object in memory at a time and the parser only has to read one line before it can begin processing.
如果您需要跟踪某些对象以供以后在解析过程中使用,建议您创建 dict
在迭代文件时保留运行值的那些特定记录。
If you need to keep track of some of the objects for later use during parsing, I suggest creating a dict
to hold those specific records of running values as your iterate the file.
说您拥有JSON
{"timestamp": 1549480267882, "sensor_val": 1.6103881016325283}
{"timestamp": 1549480267883, "sensor_val": 9.281329310309406}
{"timestamp": 1549480267883, "sensor_val": 9.357327083443344}
{"timestamp": 1549480267883, "sensor_val": 6.297722749124474}
{"timestamp": 1549480267883, "sensor_val": 3.566667175421604}
{"timestamp": 1549480267883, "sensor_val": 3.4251473635178655}
{"timestamp": 1549480267884, "sensor_val": 7.487766674770563}
{"timestamp": 1549480267884, "sensor_val": 8.701853236245032}
{"timestamp": 1549480267884, "sensor_val": 1.4070662393018396}
{"timestamp": 1549480267884, "sensor_val": 3.6524325449499995}
{"timestamp": 1549480455646, "sensor_val": 6.244199614422415}
{"timestamp": 1549480455646, "sensor_val": 5.126780276231609}
{"timestamp": 1549480455646, "sensor_val": 9.413894020722314}
{"timestamp": 1549480455646, "sensor_val": 7.091154829208067}
{"timestamp": 1549480455647, "sensor_val": 8.806417239029447}
{"timestamp": 1549480455647, "sensor_val": 0.9789474417767674}
{"timestamp": 1549480455647, "sensor_val": 1.6466189633300243}
您可以使用以下方式进行处理:
You can process this with
import json
from collections import deque
# RingBuffer from https://www.daniweb.com/programming/software-development/threads/42429/limit-size-of-a-list
class RingBuffer(deque):
def __init__(self, size):
deque.__init__(self)
self.size = size
def full_append(self, item):
deque.append(self, item)
# full, pop the oldest item, left most item
self.popleft()
def append(self, item):
deque.append(self, item)
# max size reached, append becomes full_append
if len(self) == self.size:
self.append = self.full_append
def get(self):
"""returns a list of size items (newest items)"""
return list(self)
def proc_data():
# Declare some state management in memory to keep track of whatever you want
# as you iterate through the objects
metrics = {
'latest_timestamp': 0,
'last_3_samples': RingBuffer(3)
}
with open('test.json', 'r') as infile:
for line in infile:
# Load each line
line = json.loads(line)
# Do stuff with your running metrics
metrics['last_3_samples'].append(line['sensor_val'])
if line['timestamp'] > metrics['latest_timestamp']:
metrics['latest_timestamp'] = line['timestamp']
return metrics
print proc_data()
这篇关于流JSON解析器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!