从字符串文字推断 Spark DataType [英] Inferring Spark DataType from string literals

查看:24
本文介绍了从字符串文字推断 Spark DataType的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个可以推断 Spark 的 Scala 函数 DataTypes 基于提供的输入字符串:

/*** 例子:* ========* toSparkType("string") =>字符串类型* toSparkType("boolean") =>布尔类型* toSparkType("date") =>日期类型* 等等.*/def toSparkType(inputType : String) : DataType = {var dt : 数据类型 = nullif(matchesStringRegex(inputType)) {dt = 字符串类型} else if(matchesBooleanRegex(inputType)) {dt = 布尔类型} else if(matchesDateRegex(inputType)) {dt = 日期类型} else if(...) {...}dt}

我的目标是支持可用DataTypes 的一个很大的子集,如果不是全部的话.当我开始实现这个功能时,我开始思考:Spark/Scala 可能已经有一个帮助器/实用程序可以为我做这件事."毕竟,我知道我可以做这样的事情:

var structType = new StructType()structType.add("some_new_string_col", "string", true, Metadata.empty)structType.add("some_new_boolean_col", "boolean", true, Metadata.empty)structType.add("some_new_date_col", "date", true, Metadata.empty)

Scala 和/或 Spark 会将我的 "string" 参数隐式转换为 StringType 等.所以我问:我可以用什么魔法使用 Spark 或 Scala 来帮助我实现我的转换器方法?

解决方案

Spark/Scala 可能已经有一个 helper/util 方法可以为我做这件事.

你说得对.Spark 已经拥有自己的架构和数据类型推断代码,用于从底层数据源(csv、json 等)推断架构,因此您可以查看它来实现您自己的(实际实现标记为 Spark 私有的,并且是与 RDD 和内部类相关联,因此不能直接从 Spark 外部的代码中使用它,但应该可以让您对如何进行操作有一个好主意.)

鉴于 csv 是平面类型(并且 json 可以具有嵌套结构),csv 模式推断相对更直接,应该可以帮助您完成上述任务.所以我将解释 csv 推理的工作原理(json 推理只需要考虑可能的嵌套结构,但数据类型推理非常相似).

有了那个序言,你想看看的是CSVInferSchema 对象.特别是,看看 infer 方法,它接受一个 RDD[Array[String]] 并推断整个 RDD 中每个数组元素的数据类型.它的做法是——将每个字段标记为 NullType 开始,然后迭代 Array[String] 中的下一行值(Array[String])>RDD 如果新的 DataType 更具体,它会将已经推断的 DataType 更新为新的 DataType.这正在发生 这里:

val rootTypes: Array[DataType] =tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)

现在 inferRowType 为行中的每个字段调用 inferField.inferField implementation 是您可能正在寻找的 - 它采用到目前为止为特定字段推断的类型和当前行字段的字符串值作为参数.然后它返回现有的推断类型,或者如果推断的新类型比新类型更具体.

相关部分代码如下:

typeSoFar 匹配 {case NullType =>tryParseInteger(字段,选项)case IntegerType =>tryParseInteger(字段,选项)案例 LongType =>tryParseLong(字段,选项)case _: DecimalType =>tryParseDecimal(字段,选项)案例 DoubleType =>tryParseDouble(字段,选项)case TimestampType =>tryParseTimestamp(字段,选项)case BooleanType =>tryParseBoolean(字段,选项)case StringType =>字符串类型其他情况:DataType =>throw new UnsupportedOperationException(s"意外的数据类型 $other")}

请注意,如果 typeSoFar 是 NullType,那么它首先尝试将其解析为 IntegertryParseInteger 调用是一个调用链低类型解析.因此,如果它无法将值解析为整数,那么它将调用 tryParseLong 失败时将调用 tryParseDecimal 失败时将调用 tryParseDouble沃夫维tryParseTimestamp w.o.f.w.i tryParseBoolean w.o.f.w.i.最后stringType.

因此,无论您的用例是什么,您都可以使用几乎类似的逻辑来实现.(如果您不需要跨行合并,那么您只需逐字实现所有 tryParse* 方法并简单地调用 tryParseInteger.无需编写自己的正则表达式.)

希望这会有所帮助.

I am trying to write a Scala function that can infer Spark DataTypes based on a provided input string:

/**
 * Example:
 * ========
 * toSparkType("string")  =>    StringType
 * toSparkType("boolean") =>    BooleanType
 * toSparkType("date")    =>    DateType
 * etc.
 */
def toSparkType(inputType : String) : DataType = {
    var dt : DataType = null

    if(matchesStringRegex(inputType)) {
        dt = StringType
    } else if(matchesBooleanRegex(inputType)) {
        dt = BooleanType
    } else if(matchesDateRegex(inputType)) {
        dt = DateType
    } else if(...) {
        ...
    }

    dt
}

My goal is to support a large subset, if not all, of the available DataTypes. As I started implementing this function, I got to thinking: "Spark/Scala probably already have a helper/util method that will do this for me." After all, I know I can do something like:

var structType = new StructType()

structType.add("some_new_string_col", "string", true, Metadata.empty)
structType.add("some_new_boolean_col", "boolean", true, Metadata.empty)
structType.add("some_new_date_col", "date", true, Metadata.empty)

And either Scala and/or Spark will implicitly convert my "string" argument to StringType, etc. So I ask: what magic can I do with either Spark or Scala to help me implement my converter method?

解决方案

Spark/Scala probably already have a helper/util method that will do this for me.

You're right. Spark already has its own schema and data type inference code that it uses to infer the schema from underlying data sources (csv, json etc.) So you can look at that to implement your own (the actual implementation is marked private to Spark and is tied to RDD and internal classes, so it can not be used directly from code outside of Spark but should give you a good idea on how to go about it.)

Given that csv is flat type (and json can have nested structure), csv schema inference is relative more straight forward and should help you with the task you're trying to achieve above. So I will explain how csv inference works (json inference just needs to take possibly nested structure into account but data type inference is pretty analogous).

With that prologue, the thing you want to have a look at is CSVInferSchema object. Particularly, look at the infer method which takes an RDD[Array[String]] and infer the data type for each element of the array across the whole of RDD. The way it does is -- it marks each field as NullType to begin with and then as it iterates over next row of values (Array[String]) in the RDD it updates the already inferred DataType to a new DataType if the new DataType is more specific. This is happening here:

val rootTypes: Array[DataType] =
      tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)

Now inferRowType calls inferField for each of the field in the row. inferField implementation is what you're probably looking for -- it takes type inferred so far for a particular field and the string value of the field for current row as parameter. It then returns either the existing inferred type or if the new type inferred is more specific then the new type.

Relevant section of the code is as follows:

typeSoFar match {
        case NullType => tryParseInteger(field, options)
        case IntegerType => tryParseInteger(field, options)
        case LongType => tryParseLong(field, options)
        case _: DecimalType => tryParseDecimal(field, options)
        case DoubleType => tryParseDouble(field, options)
        case TimestampType => tryParseTimestamp(field, options)
        case BooleanType => tryParseBoolean(field, options)
        case StringType => StringType
        case other: DataType =>
          throw new UnsupportedOperationException(s"Unexpected data type $other")
      }

Please note that if the typeSoFar is NullType then it first tries to parse it as Integer but tryParseInteger call is a chain of call to lower type parsing. So if it is not able to parse the value as Integer then it will invoke tryParseLong which on failure will invoke tryParseDecimal which on failure will invoke tryParseDouble w.o.f.w.i. tryParseTimestamp w.o.f.w.i tryParseBoolean w.o.f.w.i. finally stringType.

So you can use pretty much the similar logic to implement whatever your use case is. (If you do not need to merge across rows then you simply implement all the tryParse* methods verbatim and simply invoke tryParseInteger. No need to write your own regex.)

Hope this helps.

这篇关于从字符串文字推断 Spark DataType的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆