在SparkSQL中使用Avro模式和Parquet格式进行读取/写入 [英] Reading/writing with Avro schemas AND Parquet format in SparkSQL

查看:378
本文介绍了在SparkSQL中使用Avro模式和Parquet格式进行读取/写入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从SparkSQL写入和读取Parquet文件.出于架构演进的原因,我想在读写中使用Avro架构.

I'm trying to write and read Parquet files from SparkSQL. For reasons of schema evolution, I would like to use Avro schemas with my writes and reads.

我的理解是,这可以在Spark外部(或在Spark内部手动使用),例如AvroParquetWriter和Avro的通用API.但是,我想使用SparkSQL的write()和read()方法(可与DataFrameWriter和DataFrameReader一起使用),并且与SparkSQL集成良好(我将编写和读取数据集).

My understanding is that this is possible outside of Spark (or manually within Spark) using e.g. AvroParquetWriter and Avro's Generic API. However, I would like to use SparkSQL's write() and read() methods (which work with DataFrameWriter and DataFrameReader), and which integrate well with SparkSQL (I will be writing and reading Dataset's).

我一辈子都无法弄清楚该怎么做,我想知道这是否有可能. SparkSQL拼花格式似乎唯一支持的选项是压缩"和"mergeSchema"-即,没有用于指定备用模式格式或备用模式的选项.换句话说,似乎没有办法使用SparkSQL API使用Avro模式读取/写入Parquet文件.但是也许我只是想念一些东西?

I can't for the life of me figure out how to do this, and am wondering if this is possible at all. The only options the SparkSQL parquet format seems to support are "compression" and "mergeSchema" -- i.e. no options for specifying an alternate schema format or alternate schema. In other words, it appears that there is no way to read/write Parquet files using Avro schemas using the SparkSQL API. But perhaps I'm just missing something?

为澄清起见,我还了解到,这基本上只是在写入时将Avro模式添加到Parquet元数据中,并在读取时再添加一个转换层(Parquet格式-> Avro模式-> SparkSQL内部格式),但具体来说请允许我为缺少的列添加默认值(Avro模式支持,但Parquet模式不支持).

To clarify, I also understand that this will basically just add the Avro schema to the Parquet metadata on write, and will add one more translation layer on read (Parquet format -> Avro schema -> SparkSQL internal format) but will specifically allow me to add default values for missing columns (which Avro schema supports but Parquet schema does not).

此外,我不是在寻找一种将Avro转换为Parquet或Parquet到Avro的方法(而是一种将它们一起使用的方法),并且我不是在寻找一种在SparkSQL中读取/写入普通Avro的方法.可以使用databricks/spark-avro来做到这一点.

Also, I am not looking for a way to convert Avro to Parquet, or Parquet to Avro (rather a way to use them together), and I am not looking for a way to read/write plain Avro within SparkSQL (you can do this using databricks/spark-avro).

推荐答案

我正在做类似的事情.我使用avro模式将其写入镶木地板文件,但是,请勿将其读取为avro.但是,同样的技术也应该在阅读上起作用.我不确定这是否是最好的方法,但是无论如何这里都是这样: 我有具有Avro架构的AvroData.avsc.

I am doing something similar. I use avro schema to write into parquet file however, dont read it as avro. But the same technique should work on read as well. I am not sure if this is the best way to do it, but here it is anyways: I have AvroData.avsc which has the avro schema.

KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String, Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)


kafkaArr.foreachRDD  { (rdd,time) 
       => { val schema =  SchemaConverters.toSqlType(AvroData.getClassSchema).dataType.asInstanceOf[StructType] val ardd = rdd.mapPartitions{itr =>
              itr.map { r =>
try {
                    val cr = avroToListWithAudit(r._2, offsetSaved, loadDate, timeNow.toString)
                    Row.fromSeq(cr.toArray)
    } catch{
      case e:Exception => LogHandler.log.error("Exception while converting to Avro" + e.printStackTrace())
      System.exit(-1)
      Row(0)  //This is just to allow compiler to accept. On exception, the application will exit before this point
} 
} 
}


  public static List avroToListWithAudit(byte[] kfkBytes, String kfkOffset, String loaddate, String loadtime ) throws IOException {
        AvroData av = getAvroData(kfkBytes);
        av.setLoaddate(loaddate);
        av.setLoadtime(loadtime);
        av.setKafkaOffset(kfkOffset);
        return avroToList(av);
    }



 public static List avroToList(AvroData a) throws UnsupportedEncodingException{
        List<Object> l = new ArrayList<>();
        for (Schema.Field f : a.getSchema().getFields()) {
            String field = f.name().toString();
            Object value = a.get(f.name());
            if (value == null) {
                //System.out.println("Adding null");
                l.add(""); 
            }
            else {
                switch (f.schema().getType().getName()){
                    case "union"://System.out.println("Adding union");
                        l.add(value.toString());
                        break;

                    default:l.add(value);
                        break;
                }

            }
        }
        return l;
    }

getAvroData方法需要具有用于从原始字节构造avro对象的代码.我还试图找到一种方法,而不必显式指定每个属性设置器,但是似乎没有一个.

The getAvroData method needs to have code to construct the avro object from raw bytes. I am also trying to figure out a way to do that without having to specifying each attribute setter explicitly, but seems like there isnt one.

public static AvroData getAvroData (bytes)
{
AvroData av = AvroData.newBuilder().build();
        try {
            av.setAttr(String.valueOf("xyz"));
        .....
    }
   } 

希望有帮助

这篇关于在SparkSQL中使用Avro模式和Parquet格式进行读取/写入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆