从字符串文字推断Spark DataType [英] Inferring Spark DataType from string literals

查看:315
本文介绍了从字符串文字推断Spark DataType的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个可以推断Spark

I am trying to write a Scala function that can infer Spark DataTypes based on a provided input string:

/**
 * Example:
 * ========
 * toSparkType("string")  =>    StringType
 * toSparkType("boolean") =>    BooleanType
 * toSparkType("date")    =>    DateType
 * etc.
 */
def toSparkType(inputType : String) : DataType = {
    var dt : DataType = null

    if(matchesStringRegex(inputType)) {
        dt = StringType
    } else if(matchesBooleanRegex(inputType)) {
        dt = BooleanType
    } else if(matchesDateRegex(inputType)) {
        dt = DateType
    } else if(...) {
        ...
    }

    dt
}

我的目标是支持很大一部分可用的DataTypes子集.当我开始实现此功能时,我开始思考:" Spark/Scala可能已经有一个辅助程序/实用程序方法,可以为我执行此操作."毕竟,我知道我可以做类似的事情:

My goal is to support a large subset, if not all, of the available DataTypes. As I started implementing this function, I got to thinking: "Spark/Scala probably already have a helper/util method that will do this for me." After all, I know I can do something like:

var structType = new StructType()

structType.add("some_new_string_col", "string", true, Metadata.empty)
structType.add("some_new_boolean_col", "boolean", true, Metadata.empty)
structType.add("some_new_date_col", "date", true, Metadata.empty)

Scala和/或Spark会将我的"string"参数隐式转换为StringType,等等.所以我问: Spark或Scala可以做什么魔术来帮助我实现转换器方法?

And either Scala and/or Spark will implicitly convert my "string" argument to StringType, etc. So I ask: what magic can I do with either Spark or Scala to help me implement my converter method?

推荐答案

Spark/Scala可能已经有一个帮助程序/实用程序方法,可以为我完成此操作.

Spark/Scala probably already have a helper/util method that will do this for me.

您是对的. Spark已经有自己的架构和数据类型推断代码,可用于从基础数据源(csv,json等)推断架构.因此,您可以查看该代码以实现自己的代码(实际的实现标记为Spark私有,并且与RDD和内部类相关联,因此它不能直接在Spark之外的代码中使用,但应该为您提供了一个很好的主意.)

You're right. Spark already has its own schema and data type inference code that it uses to infer the schema from underlying data sources (csv, json etc.) So you can look at that to implement your own (the actual implementation is marked private to Spark and is tied to RDD and internal classes, so it can not be used directly from code outside of Spark but should give you a good idea on how to go about it.)

鉴于csv是平面类型(而json可以具有嵌套结构),则csv模式推断相对来说更为直接,应该可以帮助您完成上面要完成的任务.因此,我将解释csv推论的工作原理(json推论只需要考虑可能的嵌套结构,而数据类型推论则非常相似).

Given that csv is flat type (and json can have nested structure), csv schema inference is relative more straight forward and should help you with the task you're trying to achieve above. So I will explain how csv inference works (json inference just needs to take possibly nested structure into account but data type inference is pretty analogous).

有了这个序言,您想看看的是此处:

With that prologue, the thing you want to have a look at is CSVInferSchema object. Particularly, look at the infer method which takes an RDD[Array[String]] and infer the data type for each element of the array across the whole of RDD. The way it does is -- it marks each field as NullType to begin with and then as it iterates over next row of values (Array[String]) in the RDD it updates the already inferred DataType to a new DataType if the new DataType is more specific. This is happening here:

val rootTypes: Array[DataType] =
      tokenRdd.aggregate(startType)(inferRowType(options), mergeRowTypes)

现在inferRowType 实现可能是您正在寻找的-它需要将到目前为止为特定字段推断的类型和当前行的字段的字符串值作为参数.然后,它会返回现有的推断类型,或者如果推断的新类型更具体,则返回新类型.

Now inferRowType calls inferField for each of the field in the row. inferField implementation is what you're probably looking for -- it takes type inferred so far for a particular field and the string value of the field for current row as parameter. It then returns either the existing inferred type or if the new type inferred is more specific then the new type.

代码的相关部分如下:

typeSoFar match {
        case NullType => tryParseInteger(field, options)
        case IntegerType => tryParseInteger(field, options)
        case LongType => tryParseLong(field, options)
        case _: DecimalType => tryParseDecimal(field, options)
        case DoubleType => tryParseDouble(field, options)
        case TimestampType => tryParseTimestamp(field, options)
        case BooleanType => tryParseBoolean(field, options)
        case StringType => StringType
        case other: DataType =>
          throw new UnsupportedOperationException(s"Unexpected data type $other")
      }

请注意,如果typeSoFar为NullType,则它首先尝试将其解析为Integer,但是tryParseInteger调用是对较低类型解析的调用链.因此,如果它不能将值解析为Integer,则它将调用tryParseLong,失败时将调用tryParseDecimal,失败时将调用tryParseDoublew.o.f.w.i. tryParseTimestamp w.o.f.w.i. tryParseBoolean w.o.f.w.i.终于stringType.

Please note that if the typeSoFar is NullType then it first tries to parse it as Integer but tryParseInteger call is a chain of call to lower type parsing. So if it is not able to parse the value as Integer then it will invoke tryParseLong which on failure will invoke tryParseDecimal which on failure will invoke tryParseDouble w.o.f.w.i. tryParseTimestamp w.o.f.w.i tryParseBoolean w.o.f.w.i. finally stringType.

因此,无论您的用例是什么,您都可以使用几乎类似的逻辑来实现. (如果不需要跨行合并,则只需逐字实现所有tryParse*方法并只需调用tryParseInteger.无需编写自己的正则表达式.)

So you can use pretty much the similar logic to implement whatever your use case is. (If you do not need to merge across rows then you simply implement all the tryParse* methods verbatim and simply invoke tryParseInteger. No need to write your own regex.)

希望这会有所帮助.

这篇关于从字符串文字推断Spark DataType的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆