在Google Dataflow/Apache Beam中读取嵌套的JSON [英] Reading nested JSON in Google Dataflow / Apache Beam

查看:80
本文介绍了在Google Dataflow/Apache Beam中读取嵌套的JSON的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

可以通过以下方式通过Dataflow读取Cloud Storage上未嵌套的JSON文件:

It is possible to read unnested JSON files on Cloud Storage with Dataflow via:

p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of()));

如果我只想将经过最小过滤的日志写入BigQuery,可以使用类似以下的DoFn来做到这一点:

If I just want to write those logs with minimal filtering to BigQuery I can do so by using a DoFn like this one:

private static class Formatter extends DoFn<TableRow,TableRow> {

        @Override
        public void processElement(ProcessContext c) throws Exception {

            // .clone() since input is immutable
            TableRow output = c.element().clone();

            // remove misleading timestamp field
            output.remove("@timestamp");

            // set timestamp field by using the element's timestamp
            output.set("timestamp", c.timestamp().toString());

            c.output(output);
        }
    }
}

但是,我不知道如何以这种方式访问​​JSON文件中的嵌套字段.

However, I don't know how to access nested fields in the JSON file this way.

  1. 如果TableRow包含名为rRECORD,是否可以在不进行进一步序列化/反序列化的情况下访问其键/值?
  2. 如果我需要使用Jackson库对自己进行序列化/反序列化,那么使用TextIO.Read的标准Coder而不是TableRowJsonCoder有意义吗?这样松动吗?
  1. If the TableRow contains a RECORD named r, is it possible to access its keys/values without further serialization/deserialization?
  2. If I need to serialize/deserialize myself with the Jackson library, does it make more sense to use a the standard Coder of TextIO.Read instead of TableRowJsonCoder, to gain some of the performance back that I loose this way?

编辑

文件以换行符分隔,看起来像这样:

The files are new-line delimited, and look something like this:

{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}}
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}}

推荐答案

您最好的选择可能是按照您在#2中所述的方法,直接使用Jackson.让TextIO读取所做的工作是最有意义的-使用字符串编码器读取文件中的行-然后使用DoFn实际解析元素.类似于以下内容:

Your best bet is probably to do what you described in #2 and use Jackson directly. It makes the most sense to let the TextIO read do what it is built for -- reading lines from a file with the string coder -- and then use a DoFn to actually parse the elements. Something like the following:

PCollection<String> lines = pipeline
  .apply(TextIO.from("gs://bucket/..."));
PCollection<TableRow> objects = lines
  .apply(ParDo.of(new DoFn<String, TableRow>() {
    @Override
    public void processElement(ProcessContext c) {
      String json = c.element();
      SomeObject object = /* parse json using Jackson, etc. */;
      TableRow row = /* create a table row from object */;
      c.output(row);
    }
  });

请注意,您也可以使用多个ParDo来执行此操作.

Note that you could also do this using multiple ParDos.

这篇关于在Google Dataflow/Apache Beam中读取嵌套的JSON的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆