将基于json的日志转换为列格式,即每列一个文件 [英] converting json based log into column format, i.e., one file per column
本文介绍了将基于json的日志转换为列格式,即每列一个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
日志文件示例:
{"timestamp": "2022-01-14T00:12:21.000", "Field1": 10, "Field_Doc": {"f1": 0}}
{"timestamp": "2022-01-18T00:15:51.000", "Field_Doc": {"f1": 0, "f2": 1.7, "f3": 2}}
它将生成5个文件:
- 时间戳.列
- Field1.Column
- field_Doc.f1.Column
- field_Doc.f2.Column
- field_Doc.f3.Column
列文件格式如下:
- 字符串字段之间用换行符分隔‘ ‘性格。假设没有新行的字符串值 字符,因此无需担心转义它们
- 双精度、整型和布尔型字段表示为每行一个值
- 空值、未定义的&;空字符串表示为空行
时间戳示例内容。列:
2022-01-14T00:12:21.000
2022-01-18T00:15:51.000
注意:日志中的字段将是动态的,不要假设这些是预期的属性
有人能告诉我怎么做吗,
日志文件大小约为4 GB到48 GB
推荐答案
如果每个JSON都在一行中,那么您可以open()
归档并使用for line in file:
逐行读取-接下来您可以使用模块json
将行转换为字典并进行处理。
您可以使用for key, value in data:
单独处理每个项目。您可以使用key
创建文件名f"{key}.column"
,并以追加模式"a"
打开它,然后在此文件中写入str(value) + "
"
。
因为您有嵌套的词典,所以您需要isinstance(value, dict)
检查您是否没有{"f1": 0, "f2": 1.7, "f3": 2}
,并为此词典重复代码-这可能需要使用递归。
最小工作代码。
我使用io
只是为了模拟内存中的文件,但您应该使用open(filename)
file_data = '''{"timestamp": "2022-01-14T00:12:21.000", "Field1": 10, "Field_Doc": {"f1": 0}}
{"timestamp": "2022-01-18T00:15:51.000", "Field_Doc": {"f1": 0, "f2": 1.7, "f3": 2}}'''
import json
# --- functions ---
def process_dict(data, prefix=""):
for key, value in data.items():
if prefix:
key = prefix + "." + key
if isinstance(value, dict):
process_dict(value, key)
else:
with open(key + '.column', "a") as f:
f.write(str(value) + "
")
# --- main ---
#file_obj = open("filename")
import io
file_obj = io.StringIO(file_data) # emulate file in memory
for line in file_obj:
data = json.loads(line)
print(data)
process_dict(data)
#process_dict(data, "some prefix for all files")
编辑:
更通用的版本-它将function
作为第三个参数,因此可以与不同的功能一起使用
file_data = '''{"timestamp": "2022-01-14T00:12:21.000", "Field1": 10, "Field_Doc": {"f1": 0}}
{"timestamp": "2022-01-18T00:15:51.000", "Field_Doc": {"f1": 0, "f2": 1.7, "f3": 2}}'''
import json
# --- functions ---
def process_dict(data, func, prefix=""):
for key, value in data.items():
if prefix:
key = prefix + "." + key
if isinstance(value, dict):
process_dict(value, func, key)
else:
func(key, value)
def write_func(key, value):
with open(key + '.column', "a") as f:
f.write(str(value) + "
")
# --- main ---
#file_obj = open("filename")
import io
file_obj = io.StringIO(file_data) # emulate file in memory
for line in file_obj:
data = json.loads(line)
print(data)
process_dict(data, write_func)
#process_dict(data, write_func, "some prefix for all files")
使其更通用的另一个想法是创建平面化词典和创建
的函数{'timestamp': '2022-01-14T00:12:21.000', 'Field1': 10, 'Field_Doc.f1': 0}
{'timestamp': '2022-01-18T00:15:51.000', 'Field_Doc.f1': 0, 'Field_Doc.f2': 1.7, 'Field_Doc.f3': 2}
及更高版本使用循环写入元素。
file_data = '''{"timestamp": "2022-01-14T00:12:21.000", "Field1": 10, "Field_Doc": {"f1": 0}}
{"timestamp": "2022-01-18T00:15:51.000", "Field_Doc": {"f1": 0, "f2": 1.7, "f3": 2}}'''
import json
# --- functions ---
def flatten_dict(data, prefix=""):
result = {}
for key, value in data.items():
if prefix:
key = prefix + "." + key
if isinstance(value, dict):
result.update( process_dict(value, key) )
else:
result[key] = value
#result.update( {key: value} )
return result
# --- main ---
#file_obj = open("filename")
import io
file_obj = io.StringIO(file_data) # emulate file in memory
for line in file_obj:
data = json.loads(line)
print('before:', data)
data = flatten_dict(data)
#data = flatten_dict(data, "some prefix for all items")
print('after :', data)
print('---')
for key, value in data.items():
with open(key + '.column', "a") as f:
f.write(str(value) + "
")
这篇关于将基于json的日志转换为列格式,即每列一个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文