如何使用带有自定义UDF的DataFrame.explode将字符串拆分为子字符串? [英] How to use DataFrame.explode with a custom UDF to split a string into substrings?

查看:179
本文介绍了如何使用带有自定义UDF的DataFrame.explode将字符串拆分为子字符串?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Spark 1.5 .

我有一个DataFrame A_DF,如下所示:

I have a DataFrame A_DF as follows:

+--------------------+--------------------+
|                  id|        interactions|
+--------------------+--------------------+
|        id1         |30439831,30447866...|
|        id2         |37597858,34499875...|
|        id3         |30447866,32896718...|
|        id4         |33029476,31988037...|
|        id5         |37663606,37627579...|
|        id6         |37663606,37627579...|
|        id7         |36922232,37675077...|
|        id8         |37359529,37668820...|
|        id9         |37675077,37707778...|
+--------------------+--------------------+

其中interactionsString.我想展开,首先将interactions字符串拆分为一组子字符串,这些子字符串由逗号分割,我尝试如下操作:

where interactions is a String. I want to explode this by first splitting the interactions string into a set of substrings split by a comma which I try to do as follows:

val splitArr = udf { (s: String) => s.split(",").map(_.trim) }

val B_DF = A_DF.explode(splitArr($"interactions"))

但出现以下错误:

error: missing arguments for method explode in class DataFrame;
follow this method with `_' if you want to treat it as a partially applied function A_DF.explode(splitArr($"interactions"))

我不明白.所以我尝试了一些更复杂的事情:

which I don't understand. So I tried something even more complicated:

val B_DF = A_DF.explode($"interactions") { case (Row(interactions: String) =>
        interactions.split(",").map(_.trim))
     }

我收到检查警告的

内容为:

to which I am getting an inspection warning, that reads:

Expression of Type Array[String] does not conform to expected type TraversableOnce[A_]

有什么想法吗?

推荐答案

Dataset.explode is deprecated as of Spark 2.0.0. Unless you have a reason, stay away from it. You've been warned.

如果您确实有理由使用DataFrame.explode,请参见签名:

If you do have a reason to use DataFrame.explode, see the signatures:

explode[A, B](inputColumn: String, outputColumn: String)(f: (A) ⇒ TraversableOnce[B])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[B]): DataFrame

explode[A <: Product](input: Column*)(f: (Row) ⇒ TraversableOnce[A])(implicit arg0: scala.reflect.api.JavaUniverse.TypeTag[A]): DataFrame

在任何一种情况下,explode使用两个参数组,因此是第一个错误.

In either case, explode uses two parameter groups and hence the first error.

(这是Spark 2.1.0-SNAPSHOT )

scala> spark.version
res1: String = 2.1.0-SNAPSHOT

scala> val A_DF = Seq(("id1", "30439831,30447866")).toDF("id", "interactions")
A_DF: org.apache.spark.sql.DataFrame = [id: string, interactions: string]

scala> A_DF.explode(split($"interactions", ","))
<console>:26: error: missing argument list for method explode in class Dataset
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `explode _` or `explode(_)(_)(_)` instead of `explode`.
       A_DF.explode(split($"interactions", ","))
                   ^

您可以按以下方式进行操作(请注意关于警告,因为我使用2.1.0-SNAPSHOT时将弃用explode):

You could do it as follows (note the warning which is about deprecation of explode as I use 2.1.0-SNAPSHOT):

scala> A_DF.explode[String, String]("interactions", "parts")(_.split(",")).show
warning: there was one deprecation warning; re-run with -deprecation for details
+---+-----------------+--------+
| id|     interactions|   parts|
+---+-----------------+--------+
|id1|30439831,30447866|30439831|
|id1|30439831,30447866|30447866|
+---+-----------------+--------+

您可以按如下方式使用其他explode:

You could use the other explode as follows:

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> case class Interaction(id: String, part: String)
defined class Interaction

scala> A_DF.explode[Interaction]($"id", $"interactions") { case Row(id: String, ins: String) => ins.split(",").map { it => Interaction(id, it) } }.show
warning: there was one deprecation warning; re-run with -deprecation for details
+---+-----------------+---+--------+
| id|     interactions| id|    part|
+---+-----------------+---+--------+
|id1|30439831,30447866|id1|30439831|
|id1|30439831,30447866|id1|30447866|
+---+-----------------+---+--------+

使用

Use explode function instead and you should be fine as described in the scaladoc (quoted below):

鉴于此方法已被弃用,作为替代,您可以使用functions.explode()爆炸列:

Given that this is deprecated, as an alternative, you can explode columns either using functions.explode():

ds.select(explode(split('words, " ")).as("word"))

flatMap():

ds.flatMap(_.words.split(" "))


然后可以按以下方式使用explode函数:


You could then use explode function as follows:

A_DF.select($"id", explode(split('interactions, ",") as "part"))

这篇关于如何使用带有自定义UDF的DataFrame.explode将字符串拆分为子字符串?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆