在 Google Dataflow/Apache Beam 中读取嵌套的 JSON [英] Reading nested JSON in Google Dataflow / Apache Beam
问题描述
可以通过以下方式使用 Dataflow 读取 Cloud Storage 上未嵌套的 JSON 文件:
It is possible to read unnested JSON files on Cloud Storage with Dataflow via:
p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of()));
如果我只想将这些日志以最少的过滤写入 BigQuery,我可以使用像这样的 DoFn:
If I just want to write those logs with minimal filtering to BigQuery I can do so by using a DoFn like this one:
private static class Formatter extends DoFn<TableRow,TableRow> {
@Override
public void processElement(ProcessContext c) throws Exception {
// .clone() since input is immutable
TableRow output = c.element().clone();
// remove misleading timestamp field
output.remove("@timestamp");
// set timestamp field by using the element's timestamp
output.set("timestamp", c.timestamp().toString());
c.output(output);
}
}
}
但是,我不知道如何以这种方式访问 JSON 文件中的嵌套字段.
However, I don't know how to access nested fields in the JSON file this way.
- 如果 TableRow 包含一个名为
r
的RECORD
,是否可以在不进一步序列化/反序列化的情况下访问其键/值? - 如果我需要使用
Jackson
库对自己进行序列化/反序列化,使用TextIO.Read的标准
Coder
是否更有意义?code> 而不是TableRowJsonCoder
,以获得一些我以这种方式失去的性能?
- If the TableRow contains a
RECORD
namedr
, is it possible to access its keys/values without further serialization/deserialization? - If I need to serialize/deserialize myself with the
Jackson
library, does it make more sense to use a the standardCoder
ofTextIO.Read
instead ofTableRowJsonCoder
, to gain some of the performance back that I loose this way?
编辑
文件以换行符分隔,如下所示:
The files are new-line delimited, and look something like this:
{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}}
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}}
推荐答案
你最好的办法可能是按照你在 #2 中描述的方法并直接使用 Jackson.最有意义的是让 TextIO 读取完成它的目的——使用字符串编码器从文件中读取行——然后使用 DoFn
来实际解析元素.类似于以下内容:
Your best bet is probably to do what you described in #2 and use Jackson directly. It makes the most sense to let the TextIO read do what it is built for -- reading lines from a file with the string coder -- and then use a DoFn
to actually parse the elements. Something like the following:
PCollection<String> lines = pipeline
.apply(TextIO.from("gs://bucket/..."));
PCollection<TableRow> objects = lines
.apply(ParDo.of(new DoFn<String, TableRow>() {
@Override
public void processElement(ProcessContext c) {
String json = c.element();
SomeObject object = /* parse json using Jackson, etc. */;
TableRow row = /* create a table row from object */;
c.output(row);
}
});
请注意,您也可以使用多个 ParDo 来执行此操作.
Note that you could also do this using multiple ParDos.
这篇关于在 Google Dataflow/Apache Beam 中读取嵌套的 JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!