Java:从文件中读取JSON,转换为ORC并写入文件 [英] Java: Read JSON from a file, convert to ORC and write to a file

查看:625
本文介绍了Java:从文件中读取JSON,转换为ORC并写入文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要自动化JSON到ORC的转换过程。除了JsonReader不处理Map类型和引发异常。所以,下面的工作,但不处理地图类型。

 路径hadoopInputPath = new Path(input); 
try(RecordReader recordReader = new JsonReader(hadoopInputPath,schema,hadoopConf)){//当模式包含Map时,抛出
try(Writer writer = OrcFile.createWriter(new Path(output),OrcFile.writerOptions (hadoopConf).setSchema(schema))){
VectorizedRowBatch batch = schema.createRowBatch();
while(recordReader.nextBatch(batch)){
writer.addRowBatch(batch);



$ / code $ / pre

因此,我开始研究使用Hive类进行Json-to-ORC转换,这有一个额外的优点,即将来我可以将其转换为其他格式,例如AVRO,只需稍作修改即可。但是,我不确定使用Hive类执行此操作的最佳方式。具体而言,不清楚如何将HCatRecord写入文件,如下所示。

  HCatRecordSerDe hCatRecordSerDe = new HCatRecordSerDe(); 
SerDeUtils.initializeSerDe(hCatRecordSerDe,conf,tblProps,null);

OrcSerde orcSerde = new OrcSerde();
SerDeUtils.initializeSerDe(orcSerde,conf,tblProps,null);

Writable orcOut = orcSerde.serialize(hCatRecord,hCatRecordSerDe.getObjectInspector());
assertNotNull(orcOut);

InputStream input = getClass()。getClassLoader()。getResourceAsStream(test.json.snappy);
SnappyCodec compressionCodec = new SnappyCodec();
try(CompressionInputStream inputStream = compressionCodec.createInputStream(input)){
LineReader lineReader = new LineReader(new InputStreamReader(inputStream,Charsets.UTF_8));
String jsonLine = null; ((jsonLine = lineReader.readLine())!= null){
可写的jsonWritable = new Text(jsonLine);
DefaultHCatRecord hCatRecord =(DefaultHCatRecord)jsonSerDe.deserialize(jsonWritable);
// TODO:将ORC写入文件????


$ / code>

关于如何完成上面的代码或更简单的任何想法

解决方案

下面是我最终按照cricket_007使用Spark库的方法建议:



Maven依赖关系(带有一些例外以保持maven-duplicate-finder-plugin的快乐):

 <属性> 
< dep.jackson.version> 2.7.9< /dep.jackson.version>
< spark.version> 2.2.0< /spark.version>
< scala.binary.version> 2.11< /scala.binary.version>
< / properties>

< dependency>
< groupId> com.fasterxml.jackson.module< / groupId>
< artifactId> jackson-module-scala _ $ {scala.binary.version}< / artifactId>
< version> $ {dep.jackson.version}< / version>
<排除项>
<排除>
< groupId> com.google.guava< / groupId>
< artifactId>番石榴< / artifactId>
< /排除>
< /排除>
< /依赖关系>
< dependency>
< groupId> org.apache.spark< / groupId>
< artifactId> spark-hive _ $ {scala.binary.version}< / artifactId>
< version> $ {spark.version}< / version>
<排除项>
<排除>
< groupId> log4j< / groupId>
< artifactId> apache-log4j-extras< / artifactId>
< /排除>
<排除>
< groupId> org.apache.hadoop< / groupId>
< artifactId> hadoop-client< / artifactId>
< /排除>
<排除>
< groupId> net.java.dev.jets3t< / groupId>
< artifactId> jets3t< / artifactId>
< /排除>
<排除>
< groupId> com.google.code.findbugs< / groupId>
< artifactId> jsr305< / artifactId>
< /排除>
<排除>
< groupId> stax< / groupId>
< artifactId> stax-api< / artifactId>
< /排除>
<排除>
< groupId> org.objenesis< / groupId>
< artifactId> objenesis< / artifactId>
< /排除>
< /排除>
< /依赖关系>

Java代码简介:

  SparkConf sparkConf = new SparkConf()
.setAppName(Converter Service)
.setMaster(local [*]);

SparkSession sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport()。getOrCreate();

//读取输入数据
数据集<行> events = sparkSession.read()
.format(json)
.schema(inputConfig.getSchema())//描述输入模式的StructType
.load(inputFile.getPath()) ;

//将数据写出
DataFrameWriter< Row> frameWriter = events
.selectExpr(
//如果你想在写入ORC之前改变模式,例如[`col1`为'FirstName``,'`col2`为`LastName` ]
JavaConversions.asScalaBuffer(outputSchema.getColumns()))
.write()
.options(ImmutableMap.of(compression,zlib))
。格式(orc)
.save(outputUri.getPath());

希望这有助于开始。


I need to automate JSON-to-ORC conversion process. I was able to almost get there by using Apache's ORC-tools package except that JsonReader is doesn't handle Map type and throws an exception. So, the following works but doesn't handle Map type.

Path hadoopInputPath = new Path(input);
    try (RecordReader recordReader = new JsonReader(hadoopInputPath, schema, hadoopConf)) { // throws when schema contains Map type
        try (Writer writer = OrcFile.createWriter(new Path(output), OrcFile.writerOptions(hadoopConf).setSchema(schema))) {
            VectorizedRowBatch batch = schema.createRowBatch();
            while (recordReader.nextBatch(batch)) {
                writer.addRowBatch(batch);
            }
        }
    }

So, I started looking into using Hive classes for Json-to-ORC conversion, which has an added advantage that in the future I can convert to other formats, such as AVRO with minor code changes. However, I am not sure what the best way to do this using Hive classes. Specifically, it's not clear how to write HCatRecord to a file as shown below.

    HCatRecordSerDe hCatRecordSerDe = new HCatRecordSerDe();
    SerDeUtils.initializeSerDe(hCatRecordSerDe, conf, tblProps, null);

    OrcSerde orcSerde = new OrcSerde();
    SerDeUtils.initializeSerDe(orcSerde, conf, tblProps, null);

    Writable orcOut = orcSerde.serialize(hCatRecord, hCatRecordSerDe.getObjectInspector());
    assertNotNull(orcOut);

    InputStream input = getClass().getClassLoader().getResourceAsStream("test.json.snappy");
    SnappyCodec compressionCodec = new SnappyCodec();
    try (CompressionInputStream inputStream = compressionCodec.createInputStream(input)) {
        LineReader lineReader = new LineReader(new InputStreamReader(inputStream, Charsets.UTF_8));
        String jsonLine = null;
        while ((jsonLine = lineReader.readLine()) != null) {
            Writable jsonWritable = new Text(jsonLine);
            DefaultHCatRecord hCatRecord = (DefaultHCatRecord) jsonSerDe.deserialize(jsonWritable);
            // TODO: Write ORC to file????
        }
    }

Any ideas on how to complete the code above or simpler ways of doing JSON-to-ORC will be greatly appreciated.

解决方案

Here is what I ended up doing using Spark libraries per cricket_007 suggestion:

Maven dependency (with some exclusions to keep maven-duplicate-finder-plugin happy):

    <properties>
        <dep.jackson.version>2.7.9</dep.jackson.version>
        <spark.version>2.2.0</spark.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependency>
        <groupId>com.fasterxml.jackson.module</groupId>
        <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
        <version>${dep.jackson.version}</version>
        <exclusions>
            <exclusion>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_${scala.binary.version}</artifactId>
        <version>${spark.version}</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>apache-log4j-extras</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
            </exclusion>
            <exclusion>
                <groupId>net.java.dev.jets3t</groupId>
                <artifactId>jets3t</artifactId>
            </exclusion>
            <exclusion>
                <groupId>com.google.code.findbugs</groupId>
                <artifactId>jsr305</artifactId>
            </exclusion>
            <exclusion>
                <groupId>stax</groupId>
                <artifactId>stax-api</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.objenesis</groupId>
                <artifactId>objenesis</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

Java code synopsis:

SparkConf sparkConf = new SparkConf()
    .setAppName("Converter Service")
    .setMaster("local[*]");

SparkSession sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate();

// read input data
Dataset<Row> events = sparkSession.read()
    .format("json")
    .schema(inputConfig.getSchema()) // StructType describing input schema
    .load(inputFile.getPath());

// write data out
DataFrameWriter<Row> frameWriter = events
    .selectExpr(
        // useful if you want to change the schema before writing it to ORC, e.g. ["`col1` as `FirstName`", "`col2` as `LastName`"]
        JavaConversions.asScalaBuffer(outputSchema.getColumns()))
    .write()
    .options(ImmutableMap.of("compression", "zlib"))
    .format("orc")
    .save(outputUri.getPath());

Hope this helps somebody to get started.

这篇关于Java:从文件中读取JSON,转换为ORC并写入文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆