使用Scala将列分配给Spark Dataframe中的其他列 [英] Assigning columns to another columns in a Spark Dataframe using Scala
问题描述
我一直在寻找一个很好的问题,以提高我的Scala技能和答案:
I was looking at this excellent question so as to improve my Scala skills and the answer: Extract a column value and assign it to another column as an array in spark dataframe
我按如下所示创建了修改后的代码,但仍然存在一些问题:
I created my modified code as follows which works, but am left with a few questions:
import spark.implicits._
import org.apache.spark.sql.functions._
val df = sc.parallelize(Seq(
("r1", 1, 1),
("r2", 6, 4),
("r3", 4, 1),
("r4", 1, 2)
)).toDF("ID", "a", "b")
val uniqueVal = df.select("b").distinct().map(x => x.getAs[Int](0)).collect.toList
def myfun: Int => List[Int] = _ => uniqueVal
def myfun_udf = udf(myfun)
df.withColumn("X", myfun_udf( col("b") )).show
+---+---+---+---------+
| ID| a| b| X|
+---+---+---+---------+
| r1| 1| 1|[1, 4, 2]|
| r2| 6| 4|[1, 4, 2]|
| r3| 4| 1|[1, 4, 2]|
| r4| 1| 2|[1, 4, 2]|
+---+---+---+---------+
它有效,但是:
- 我注意到b列被放置了两次.
- 我也可以在第二条语句中的a列中输入相同的结果.例如.那是什么意思呢?
df.withColumn("X",myfun_udf(col("a"))).show
df.withColumn("X", myfun_udf( col("a") )).show
- 如果我输入col ID,那么它将为空.
- 所以,我想知道为什么要输入第二个col吗?
- 又如何使它对所有列通用?
- 它不可伸缩-在最坏的情况下,每行的大小与大小成比例
- 您已经知道它根本不需要参数.
- 编写它时(在2016年12月23日发布了Spark 1.6和2.0时,它并不需要(以及重要的是不需要)
udf
> - 如果您仍然想使用
udf
null变量就足够了 - It is not scalable - in the worst case scenario size of each row is proportional to the size
- As you've already figure out it doesn't need argument at all.
- It doesn't need (and what's important it didn't need)
udf
at the time it was written (on 2016-12-23 Spark 1.6 and 2.0 where already released) - If you still wanted to use
udf
nullary variant would suffice -
如果您有本地列表,并且确实要使用
udf
.对于单个序列,请结合使用udf
和nullary
函数: If you have a local list and you really want to use
udf
. For single sequence useudf
withnullary
function:
所以,这是我在其他地方查看过的代码,但是我缺少一些东西.
So, this was code that I looked at elsewhere, but I am missing something.
推荐答案
您显示的代码意义不大:
The code you've shown doesn't make much sense:
总体而言,这只是为OP提供服务的另一个令人费解和误导性的答案.我会忽略(或进行相应投票)然后继续.
Overall it is just another convoluted and misleading answer that served OP at the point. I'd ignore (or vote accordingly) and move on.
那怎么办呢?
val uniqueBVal: Seq[Int] = ???
val addUniqueBValCol = udf(() => uniqueBVal)
df.withColumn("X", addUniqueBValCol())
推广到:
import scala.reflect.runtime.universe.TypeTag
def addLiteral[T : TypeTag](xs: Seq[T]) = udf(() => xs)
val x = addLiteral[Int](uniqueBVal)
df.withColumn("X", x())
最好不要使用 udf
:
import org.apache.spark.sql.functions._
df.withColumn("x", array(uniquBVal map lit: _*))
截至
As of
又如何使它在所有列上通用?
And how this could be made to work generically for all columns?
正如开头提到的,整个概念很难辩护.两种窗口功能(完全无法扩展)
as mentioned at the beginning the whole concept is hard to defend. Either window functions (completely not scalable)
import org.apache.spark.sql.expressions.Window
val w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.select($"*" +: df.columns.map(c => collect_set(c).over(w).alias(s"${c}_unique")): _*)
或以聚合方式交叉连接(大多数情况下不可扩展)
or cross join with aggregate (most of the time not scalable)
val uniqueValues = df.select(
df.columns map (c => collect_set(col(c)).alias(s"${c}_unique")):_*
)
df.crossJoin(uniqueValues)
尽管如此-通常,您必须重新考虑您的方法,如果这种方法可以应用于实际应用程序中,除非您确定不能知道列的基数很小并且具有严格的上限.
In general though - you'll have to rethink your approach, if this comes anywhere actual applications, unless you know for sure, that cardinalities of columns are small and have strict upper bounds.
带走消息是-不要相信随机人在Internet上发布的随机代码.其中一个.
Take away message is - don't trust random code that random people post in Internet. This one included.
这篇关于使用Scala将列分配给Spark Dataframe中的其他列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!