流JSON解析器 [英] Streaming json parser

查看:97
本文介绍了流JSON解析器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望为无法加载到内存的非常大的JSON文件(〜1TB)实现流式json解析器。一种选择是使用类似 https://github.com/stedolan/jq 之类的文件来转换文件放入json-newline分隔,但是我需要对每个json对象做很多其他事情,这使这种方法不理想。

I am looking to implement a streaming json parser for a very, very large JSON file (~ 1TB) that I'm unable to load into memory. One option is to use something like https://github.com/stedolan/jq to convert the file into json-newline-delimited, but there are various other things I need to do to each json object, that makes this approach not ideal.

给出一个非常大的json对象,我将如何逐对象解析它,类似于xml中的这种方法: https://www.ibm.com/developerworks/library/x-hiperfparse/index.html

Given a very large json object, how would I be able to parse it object-by-object, similar to this approach in xml: https://www.ibm.com/developerworks/library/x-hiperfparse/index.html.

例如,伪代码中:

with open('file.json','r') as f:
    json_str = ''
    for line in f: # what if there are no newline in the json obj?
        json_str += line
        if is_valid(json_str):
            obj = json.loads(json_str)
            do_something()
            json_str = ''

此外,我发现 jq -c 并不是特别快(忽略内存注意事项)。例如,执行 json.loads 与使用 jq -c 一样快(并且更快)。我也尝试使用 ujson ,但是一直出现损坏错误,我认为这与文件大小有关。

Additionally, I did not find jq -c to be particularly fast (ignoring memory considerations). For example, doing json.loads was just as fast (and a bit faster) than using jq -c. I tried using ujson as well, but kept getting a corruption error which I believe was related to the file size.

# file size is 2.2GB
>>> import json,time
>>> t0=time.time();_=json.loads(open('20190201_itunes.txt').read());print (time.time()-t0)
65.6147990227

$ time cat 20190206_itunes.txt|jq -c '.[]' > new.json
real    1m35.538s
user    1m25.109s
sys 0m15.205s

最后,这是一个示例100KB json输入,可用于测试: https:// hastebin.com/ecahufonet.json

Finally, here is an example 100KB json input which can be used for testing: https://hastebin.com/ecahufonet.json

推荐答案

如果文件包含一个大的JSON对象(数组或映射) ,然后按照JSON规范,必须先读取整个对象,然后才能访问其组件。

If the file contains one large JSON object (either array or map), then per the JSON spec, you must read the entire object before you can access its components.

例如,如果文件是对象为 [{...},{...}] ,然后以换行符分隔的JSON效率更高,因为您一次只需要在内存中保留一个对象,解析器只需读取一个即可行才能开始处理。

If for instance the file is an array with objects [ {...}, {...} ] then newline delimited JSON is far more efficient since you only have to keep one object in memory at a time and the parser only has to read one line before it can begin processing.

如果您需要跟踪某些对象以供以后在解析过程中使用,建议您创建 dict 在迭代文件时保留运行值的那些特定记录。

If you need to keep track of some of the objects for later use during parsing, I suggest creating a dict to hold those specific records of running values as your iterate the file.

说您拥有JSON

{"timestamp": 1549480267882, "sensor_val": 1.6103881016325283}
{"timestamp": 1549480267883, "sensor_val": 9.281329310309406}
{"timestamp": 1549480267883, "sensor_val": 9.357327083443344}
{"timestamp": 1549480267883, "sensor_val": 6.297722749124474}
{"timestamp": 1549480267883, "sensor_val": 3.566667175421604}
{"timestamp": 1549480267883, "sensor_val": 3.4251473635178655}
{"timestamp": 1549480267884, "sensor_val": 7.487766674770563}
{"timestamp": 1549480267884, "sensor_val": 8.701853236245032}
{"timestamp": 1549480267884, "sensor_val": 1.4070662393018396}
{"timestamp": 1549480267884, "sensor_val": 3.6524325449499995}
{"timestamp": 1549480455646, "sensor_val": 6.244199614422415}
{"timestamp": 1549480455646, "sensor_val": 5.126780276231609}
{"timestamp": 1549480455646, "sensor_val": 9.413894020722314}
{"timestamp": 1549480455646, "sensor_val": 7.091154829208067}
{"timestamp": 1549480455647, "sensor_val": 8.806417239029447}
{"timestamp": 1549480455647, "sensor_val": 0.9789474417767674}
{"timestamp": 1549480455647, "sensor_val": 1.6466189633300243}

您可以使用以下方式进行处理:

You can process this with

import json
from collections import deque

# RingBuffer from https://www.daniweb.com/programming/software-development/threads/42429/limit-size-of-a-list
class RingBuffer(deque):
    def __init__(self, size):
        deque.__init__(self)
        self.size = size

    def full_append(self, item):
        deque.append(self, item)
        # full, pop the oldest item, left most item
        self.popleft()

    def append(self, item):
        deque.append(self, item)
        # max size reached, append becomes full_append
        if len(self) == self.size:
            self.append = self.full_append

    def get(self):
        """returns a list of size items (newest items)"""
        return list(self)


def proc_data():
    # Declare some state management in memory to keep track of whatever you want
    # as you iterate through the objects
    metrics = {
        'latest_timestamp': 0,
        'last_3_samples': RingBuffer(3)
    }

    with open('test.json', 'r') as infile:        
        for line in infile:
            # Load each line
            line = json.loads(line)
            # Do stuff with your running metrics
            metrics['last_3_samples'].append(line['sensor_val'])
            if line['timestamp'] > metrics['latest_timestamp']:
                metrics['latest_timestamp'] = line['timestamp']

    return metrics

print proc_data()

这篇关于流JSON解析器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆