在 SparkSQL 中使用 Avro 模式和 Parquet 格式进行读/写 [英] Reading/writing with Avro schemas AND Parquet format in SparkSQL

查看:34
本文介绍了在 SparkSQL 中使用 Avro 模式和 Parquet 格式进行读/写的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从 SparkSQL 写入和读取 Parquet 文件.由于模式演变的原因,我想在我的写入和读取中使用 Avro 模式.

I'm trying to write and read Parquet files from SparkSQL. For reasons of schema evolution, I would like to use Avro schemas with my writes and reads.

我的理解是,这可以在 Spark 之外(或在 Spark 中手动)使用例如AvroParquetWriter 和 Avro 的通用 API.但是,我想使用 SparkSQL 的 write() 和 read() 方法(与 DataFrameWriter 和 DataFrameReader 一起使用),并且与 SparkSQL 很好地集成(我将编写和读取 Dataset 的).

My understanding is that this is possible outside of Spark (or manually within Spark) using e.g. AvroParquetWriter and Avro's Generic API. However, I would like to use SparkSQL's write() and read() methods (which work with DataFrameWriter and DataFrameReader), and which integrate well with SparkSQL (I will be writing and reading Dataset's).

我终其一生都无法弄清楚如何做到这一点,我想知道这是否可行.SparkSQL parquet 格式似乎支持的唯一选项是compression"和mergeSchema"——即没有用于指定替代模式格式或替代模式的选项.换句话说,似乎无法使用 SparkSQL API 使用 Avro 模式读取/写入 Parquet 文件.但也许我只是错过了什么?

I can't for the life of me figure out how to do this, and am wondering if this is possible at all. The only options the SparkSQL parquet format seems to support are "compression" and "mergeSchema" -- i.e. no options for specifying an alternate schema format or alternate schema. In other words, it appears that there is no way to read/write Parquet files using Avro schemas using the SparkSQL API. But perhaps I'm just missing something?

澄清一下,我也明白这基本上只是在写入时将 Avro 模式添加到 Parquet 元数据中,并在读取时再添加一个翻译层(Parquet 格式 -> Avro 模式 -> SparkSQL 内部格式),但会特别允许我为缺失的列添加默认值(Avro 架构支持但 Parquet 架构不支持).

To clarify, I also understand that this will basically just add the Avro schema to the Parquet metadata on write, and will add one more translation layer on read (Parquet format -> Avro schema -> SparkSQL internal format) but will specifically allow me to add default values for missing columns (which Avro schema supports but Parquet schema does not).

另外,我不是在寻找一种将 Avro 转换为 Parquet 或 Parquet 到 Avro 的方法(而是一种将它们一起使用的方法),而且我不是在寻找一种在 SparkSQL 中读/写普通 Avro 的方法(您可以使用 databricks/spark-avro 来做到这一点).

Also, I am not looking for a way to convert Avro to Parquet, or Parquet to Avro (rather a way to use them together), and I am not looking for a way to read/write plain Avro within SparkSQL (you can do this using databricks/spark-avro).

推荐答案

我正在做类似的事情.我使用 avro 模式写入镶木地板文件,但不要将其作为 avro 读取.但同样的技术也应该适用于阅读.我不确定这是否是最好的方法,但无论如何:我有 AvroData.avsc,它具有 avro 架构.

I am doing something similar. I use avro schema to write into parquet file however, dont read it as avro. But the same technique should work on read as well. I am not sure if this is the best way to do it, but here it is anyways: I have AvroData.avsc which has the avro schema.

KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String, Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)


kafkaArr.foreachRDD  { (rdd,time) 
       => { val schema =  SchemaConverters.toSqlType(AvroData.getClassSchema).dataType.asInstanceOf[StructType] val ardd = rdd.mapPartitions{itr =>
              itr.map { r =>
try {
                    val cr = avroToListWithAudit(r._2, offsetSaved, loadDate, timeNow.toString)
                    Row.fromSeq(cr.toArray)
    } catch{
      case e:Exception => LogHandler.log.error("Exception while converting to Avro" + e.printStackTrace())
      System.exit(-1)
      Row(0)  //This is just to allow compiler to accept. On exception, the application will exit before this point
} 
} 
}


  public static List avroToListWithAudit(byte[] kfkBytes, String kfkOffset, String loaddate, String loadtime ) throws IOException {
        AvroData av = getAvroData(kfkBytes);
        av.setLoaddate(loaddate);
        av.setLoadtime(loadtime);
        av.setKafkaOffset(kfkOffset);
        return avroToList(av);
    }



 public static List avroToList(AvroData a) throws UnsupportedEncodingException{
        List<Object> l = new ArrayList<>();
        for (Schema.Field f : a.getSchema().getFields()) {
            String field = f.name().toString();
            Object value = a.get(f.name());
            if (value == null) {
                //System.out.println("Adding null");
                l.add(""); 
            }
            else {
                switch (f.schema().getType().getName()){
                    case "union"://System.out.println("Adding union");
                        l.add(value.toString());
                        break;

                    default:l.add(value);
                        break;
                }

            }
        }
        return l;
    }

getAvroData 方法需要有代码来从原始字节构造 avro 对象.我也在尝试找出一种方法来做到这一点,而不必明确指定每个属性设置器,但似乎没有.

The getAvroData method needs to have code to construct the avro object from raw bytes. I am also trying to figure out a way to do that without having to specifying each attribute setter explicitly, but seems like there isnt one.

public static AvroData getAvroData (bytes)
{
AvroData av = AvroData.newBuilder().build();
        try {
            av.setAttr(String.valueOf("xyz"));
        .....
    }
   } 

希望能帮到你

这篇关于在 SparkSQL 中使用 Avro 模式和 Parquet 格式进行读/写的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆