关于如何在Scala中将新列添加到具有随机值的现有DataFrame中 [英] About how to add a new column to an existing DataFrame with random values in Scala

查看:442
本文介绍了关于如何在Scala中将新列添加到具有随机值的现有DataFrame中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带镶木地板文件的数据框,我必须添加一个包含一些随机数据的新列,但是我需要该随机数据彼此不同.这是我的实际代码,spark的当前版本是1.5.1-cdh-5.5.2:

i have a dataframe with a parquet file and I have to add a new column with some random data, but I need that random data different each other. This is my actual code and the current version of spark is 1.5.1-cdh-5.5.2:

val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686 
mydf.cache

val r = scala.util.Random
import org.apache.spark.sql.functions.udf
def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1 ).toString.concat("D")}
val myFunction = udf(myNextPositiveNumber _)
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))

使用此代码,我有以下数据:

with this code, I have this data:

scala> myNewDF.select("myNewColumn").show(10,false)
+-----------+
|myNewColumn|
+-----------+
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
|889488717D |
+-----------+

好像udf myNextPositiveNumber仅被调用一次,不是吗?

It looks like that the udf myNextPositiveNumber is invoked only once, isn't?

更新 确认,只有一个不同的值:

update confirmed, there is only one distinct value:

scala> myNewDF.select("myNewColumn").distinct.show(50,false)
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
...

+-----------+                                                                   
|myNewColumn|
+-----------+
|889488717D |
+-----------+

我在做什么错了?

更新2:最后,在@ user6910411的帮助下,我有了以下代码:

Update 2: finally, with the help of @user6910411 I have this code:

val mydf = sqlContext.read.parquet("some.parquet")
// mydf.count()
// 63385686 
mydf.cache

val r = scala.util.Random

import org.apache.spark.sql.functions.udf

val accum = sc.accumulator(1)

def myNextPositiveNumber():String = {
   accum+=1
   accum.value.toString.concat("D")
}

val myFunction = udf(myNextPositiveNumber _)

val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber))

myNewDF.select("myNewColumn").count

// 63385686

更新3

实际代码会生成如下数据:

Actual code generates data like this:

scala> mydf.select("myNewColumn").show(5,false)
17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+-----------+
|myNewColumn|
+-----------+
|2D         |
|2D         |
|2D         |
|2D         |
|2D         |
+-----------+
only showing top 5 rows

看起来udf函数仅被调用一次,不是吗?我需要在该列中添加一个新的随机元素.

It looks like the udf function is invoked only once, isn't? I need a new random element in that column.

更新4 @ user6910411

update 4 @user6910411

我有这个增加id的实际代码,但是它没有连接最终的char,这很奇怪.这是我的代码:

i have this actual code that increases the id but it is not concatenating the final char, it is weird. This is my code:

import org.apache.spark.sql.functions.udf


val mydf = sqlContext.read.parquet("some.parquet")

mydf.cache

def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D")

val myFunction = udf(myNextPositiveNumber _)

val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber))

scala> myNewDF.select("myNewColumn").show(5,false)
17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1:
[rdd_4_0]
+-----------+
|myNewColumn|
+-----------+
|0          |
|1          |
|2          |
|3          |
|4          |
+-----------+

我需要类似的东西:

+-----------+
|myNewColumn|
+-----------+
|1D         |
|2D         |
|3D         |
|4D         |
+-----------+

推荐答案

火花> = 2.3

可以使用asNondeterministic方法禁用某些优化:

It is possible to disable some optimizations using asNondeterministic method:

import org.apache.spark.sql.expressions.UserDefinedFunction

val f: UserDefinedFunction = ???
val fNonDeterministic: UserDefinedFunction = f.asNondeterministic

在使用此选项之前,请确保您了解担保.

Please make sure you understand the guarantees before using this option.

火花< 2.3

传递给udf的函数应该是确定性的( SPARK-20586 )和null函数调用可以用常量代替.如果要生成随机数,请使用以下内置函数:

Function which is passed to udf should be deterministic (with possible exception of SPARK-20586) and nullary functions calls can be replaced by constants. If you want to generate random numbers use on of the built-in functions:

  • rand - Generate a random column with independent and identically distributed (i.i.d.) samples from U[0.0, 1.0].
  • randn - Generate a column with independent and identically distributed (i.i.d.) samples from the standard normal distribution.

并转换输出以获取所需的分布,例如:

and transform the output to obtain required distribution for example:

(rand * Integer.MAX_VALUE).cast("bigint").cast("string")

这篇关于关于如何在Scala中将新列添加到具有随机值的现有DataFrame中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆