使用数据框触发 udf [英] spark udf with data frame

查看:28
本文介绍了使用数据框触发 udf的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Spark 1.3.我有一个数据集,其中列(ordering_date 列)中的日期采用 yyyy/MM/dd 格式.我想对日期进行一些计算,因此我想使用 jodatime 进行一些转换/格式化.这是我拥有的 udf :

I am using Spark 1.3. I have a dataset where the dates in column (ordering_date column) are in yyyy/MM/dd format. I want to do some calculations with dates and therefore I want to use jodatime to do some conversions/formatting. Here is the udf that I have :

 val return_date = udf((str: String, dtf: DateTimeFormatter) => dtf.formatted(str))

这是调用 udf 的代码.但是,我收到错误提示不适用".我需要注册这个 UDF 还是我在这里遗漏了什么?

Here is the code where the udf is being called. However, I get error saying "Not Applicable". Do I need to register this UDF or am I missing something here?

val user_with_dates_formatted = users.withColumn(
  "formatted_date",
  return_date(users("ordering_date"), DateTimeFormat.forPattern("yyyy/MM/dd")
)

推荐答案

我不相信您可以将 DateTimeFormatter 作为参数传递给 UDF.您只能传入一个 Column.一种解决方案是:

I don't believe you can pass in the DateTimeFormatter as an argument to the UDF. You can only pass in a Column. One solution would be to do:

val return_date = udf((str: String, format: String) => {
  DateTimeFormat.forPatten(format).formatted(str))
})

然后:

val user_with_dates_formatted = users.withColumn(
  "formatted_date",
  return_date(users("ordering_date"), lit("yyyy/MM/dd"))
)

老实说——这个算法和你的原始算法都有同样的问题.它们都使用 forPattern 为每条记录解析 yyyy/MM/dd.更好的是创建一个包裹在 Map[String,DateTimeFormatter] 周围的单例对象,可能是这样的(完全未经测试,但你明白了):

Honestly, though -- both this and your original algorithms have the same problem. They both parse yyyy/MM/dd using forPattern for every record. Better would be to create a singleton object wrapped around a Map[String,DateTimeFormatter], maybe like this (thoroughly untested, but you get the idea):

object DateFormatters {
  var formatters = Map[String,DateTimeFormatter]()

  def getFormatter(format: String) : DateTimeFormatter = {
    if (formatters.get(format).isEmpty) {
      formatters = formatters + (format -> DateTimeFormat.forPattern(format))
    }
    formatters.get(format).get
  }
}

然后您将UDF更改为:

val return_date = udf((str: String, format: String) => {
  DateFormatters.getFormatter(format).formatted(str))
})

这样,DateTimeFormat.forPattern(...) 只被每个执行器按格式调用一次.

That way, DateTimeFormat.forPattern(...) is only called once per format per executor.

关于单例对象解决方案需要注意的一点是你不能在 spark-shell 中定义对象——你必须把它打包成一个 JAR 文件并使用 --jars 选项到 spark-shell 如果你想在 shell 中使用 DateFormatters 对象.

One thing to note about the singleton object solution is that you can't define the object in the spark-shell -- you have to pack it up in a JAR file and use the --jars option to spark-shell if you want to use the DateFormatters object in the shell.

这篇关于使用数据框触发 udf的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆