与不断变化的数据框星火架构 [英] Evolving a schema with Spark DataFrame

查看:242
本文介绍了与不断变化的数据框星火架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我用一个Spark数据帧它可以从三个不同的模式版本之一加载数据时

  //原创
{A:{B:1}}
//增加C
{A:{B:1},C:2}
//附加A.D
{A:{B:1,D:3},C:2}

我可以通过检查该架构包含一个字段C,并处理额外的C,如果不添加新列到数据帧。不过,我不能工作了如何创建一个领域,为子对象。

 公共无效evolvingSchema(){
    VersionOne的字符串={\\A \\:{\\B \\:1}};
    字符串versionTwo ={\\A \\:{\\B \\:1} \\C \\:2};
    字符串versionThree ={\\A \\:{\\B \\:1,\\D \\:3},\\C \\:2};    处理(spark.getContext(),1,VersionOne的);
    处理(spark.getContext(),2,versionTwo);
    处理(spark.getContext(),2,versionThree);
}私有静态无效过程(JavaSparkContext SC,版本字符串,字符串数据){
    SQLContext sqlContext =新SQLContext(SC);
    数据帧DF = sqlContext.read()JSON(sc.parallelize(Arrays.asList(数据)));
    如果(!Arrays.asList(df.schema()。FIELDNAMES())。包含(C)){
        DF = df.withColumn(C,org.apache.spark.sql.functions.lit(NULL));
    }
    //不知道要放什么东西在这里。加FIELDNAMES不包含A.D    尝试{
        df.select(C)收集()。
    }赶上(例外五){
        的System.out.println(无法℃,+版);
    }
    尝试{
        。df.select(A.D)收集();
    }赶上(例外五){
        的System.out.println(无法A.D为+版);
    }
}


解决方案

JSON来源不能很好地与不断发展的架构(怎么样的Avro或实木复合地板代替),适用于数据,但简单的解决方案是使用相同的架构,用于所有光源做出新的领域可选/ null的:

 进口org.apache.spark.sql.types {StructType,StructField,LongType}VAL模式= StructType(SEQ(
  StructField(A,StructType(SEQ(
    StructField(B,LongType,真),
    StructField(D,LongType,真)
  )),真)
  StructField(C,LongType,真实)))

您可以通过模式像这样 DataFrameReader

  VAL rddV1 = sc.parallelize(SEQ({\\A \\:{\\B \\:1}}))
VAL DF1 = sqlContext.read.schema(架构)以.json(rddV1)VAL rddV2 = sc.parallelize(SEQ({\\A \\:{\\B \\:1} \\C \\:2}))
VAL DF2 = sqlContext.read.schema(架构)以.json(rddV2)VAL rddV3 = sc.parallelize(SEQ({\\A \\:{\\B \\:1,\\D \\:3},\\C \\:2}))
VAL DF3 = sqlContext.read.schema(架构)以.json(rddV3)

,你会得到一个一致的结构独立的变体:

 要求(df1.schema == df2.schema&放大器;&安培; df2.schema == df3.schema)

使用自动设置为失踪列

  df1.printSchema
// 根
// | - 答:结构(可为空=真)
// | | - B:长(可为空=真)
// | | - D:长(可为空=真)
// | - C:长(可为空=真)df1.show
// + -------- + ---- +
// | A | B |
// + -------- + ---- +
// | [1,空] |空|
// + -------- + ---- +df2.show
// + -------- + --- +
// | A | B |
// + -------- + --- +
// | [1,空] | 2 |
// + -------- + --- +df3.show
// + ----- + --- +
// | A | B |
// + ----- + --- +
// | [1,3] | 2 |
// + ----- + --- +

I'm working with a Spark dataframe which could be loading data from one of three different schema versions:

// Original
{ "A": {"B": 1 } }
// Addition "C"
{ "A": {"B": 1 }, "C": 2 }
// Additional "A.D"
{ "A": {"B": 1, "D": 3 }, "C": 2 }

I can process the additional "C" by checking if the schema contains a field "C" and if not adding a new column to the dataframe. However I can't work out how to create a field for the sub-object.

public void evolvingSchema() {
    String versionOne = "{ \"A\": {\"B\": 1 } }";
    String versionTwo = "{ \"A\": {\"B\": 1 }, \"C\": 2 }";
    String versionThree = "{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }";

    process(spark.getContext(), "1", versionOne);
    process(spark.getContext(), "2", versionTwo);
    process(spark.getContext(), "2", versionThree);
}

private static void process(JavaSparkContext sc, String version, String data) {
    SQLContext sqlContext = new SQLContext(sc);
    DataFrame df = sqlContext.read().json(sc.parallelize(Arrays.asList(data)));
    if(!Arrays.asList(df.schema().fieldNames()).contains("C")) {
        df = df.withColumn("C", org.apache.spark.sql.functions.lit(null));
    }
    // Not sure what to put here. The fieldNames does not contain the "A.D"

    try {
        df.select("C").collect();
    } catch(Exception e) {
        System.out.println("Failed to C for " + version);
    }
    try {
        df.select("A.D").collect();
    } catch(Exception e) {
        System.out.println("Failed to A.D for " + version);
    }
}

解决方案

JSON sources are not very well suited for data with evolving schema (how about Avro or Parquet instead) but the simple solution is to use the same schema for all sources and make new fields optional / nullable:

import org.apache.spark.sql.types.{StructType, StructField, LongType}

val schema = StructType(Seq(
  StructField("A", StructType(Seq(
    StructField("B", LongType, true), 
    StructField("D", LongType, true)
  )), true),
  StructField("C", LongType, true)))

You can pass schema like this to DataFrameReader:

val rddV1 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 } }"))
val df1 = sqlContext.read.schema(schema).json(rddV1)

val rddV2 = sc.parallelize(Seq("{ \"A\": {\"B\": 1 }, \"C\": 2 }"))
val df2 = sqlContext.read.schema(schema).json(rddV2)

val rddV3 = sc.parallelize(Seq("{ \"A\": {\"B\": 1, \"D\": 3 }, \"C\": 2 }"))
val df3 = sqlContext.read.schema(schema).json(rddV3)

and you'll get a consistent structure independent of a variant:

require(df1.schema == df2.schema && df2.schema == df3.schema)

with missing columns automatically set to null:

df1.printSchema
// root
//  |-- A: struct (nullable = true)
//  |    |-- B: long (nullable = true)
//  |    |-- D: long (nullable = true)
//  |-- C: long (nullable = true)

df1.show
// +--------+----+
// |       A|   C|
// +--------+----+
// |[1,null]|null|
// +--------+----+

df2.show
// +--------+---+
// |       A|  C|
// +--------+---+
// |[1,null]|  2|
// +--------+---+

df3.show
// +-----+---+
// |    A|  C|
// +-----+---+
// |[1,3]|  2|
// +-----+---+

这篇关于与不断变化的数据框星火架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆