在Scala中将Spark Dataframe(带有WrappedArray)转换为RDD [labelPoint] [英] Converting Spark Dataframe(with WrappedArray) to RDD[labelPoint] in scala

查看:522
本文介绍了在Scala中将Spark Dataframe(带有WrappedArray)转换为RDD [labelPoint]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Scala的新手,我想将数据帧转换为rdd。让标签,功能转换为 RDD [labelPoint] 作为MLlib的输入。但是我找不到解决方法 WrappedArray

  scala> test.printSchema 
root
|-user_id:long(可空= true)
|-brand_store_sn:字符串(nullable = true)
|-label:整数(nullable = true)
|-money_score:双倍(nullable = true)
|-normal_score:双倍(nullable = true)
|-action_score:双倍(nullable = true)
|-功能:数组(可空= true)
| |-元素:字符串(containsNull = true)
|-标志:字符串(nullable = true)
|-dt:字符串(nullable = true)


scala> test.head
res21:org.apache.spark.sql.Row = [2533,10005072,1,2.0,1.0,1.0,WrappedArray([ d90_pv_1sec:1.4471580313422192, d3_pv_1sec:0.9030899869919435, d7_pv_1sec: 0.9030899869919435, d30_pv_1sec:1.414973347970818, d90_pv_week_decay:1.4235871662780681, d1_pv_1sec:0.9030899869919435, d120_pv_1sec:1.4471580313422192]),user_positive,20161130] $ b $ c

解决方案

首先-因为 LabeledPoint 期望矢量为 Double ,我假设您还想按冒号(),并将其右侧视为双精度型,例如:

  d90_pv_1sec:1.4471580313422192 -> 1.4471580313422192 

如果是-这是转换:

  import org.apache.spark.mllib.linalg。{Vector,Vectors} 
import org.apache.spark.mllib.regression.LabeledPoint

//示例数据-具有标签,特征和其他列的DataFrame
val df = Seq(
(1,Array( d90_pv_1sec:1.4471580313422192, d3_pv_1sec:0.9030899869919435),4.0),
(2,Array( d7_pv_1sec:0.9030899869919435, d30_pv_1sec:1.414973347970818),5.0)
).toDF( label, features, ignored)

//从Row中提取相关字段,并将WrappedArray [String]转换为Vector:
val result = df.rdd.map(r => {
val label = r.getAs [Int]( label )
val featuresArray = r.getAs [mutable.WrappedArray [String]]( features)
val features:Vector = Vectors.dense(
featuresArray.map(_。split( :)(1).toDouble).toArray

LabeledPoint(label,features)
})

result.foreach(println)
/ /(1.0,[1.4471580313422192,0.9030899869919435])
//(2.0,[0.9030899869919435,1.414973347970818])

编辑:为明确起见,现在假设输入数组中的每个项目在生成的稀疏向量中都包含预期的 index

  d90_pv_1sec:1.4471580313422192->指数= 90;值= 1.4471580313422192 

修改后的代码为:

  val vectorSize:Int = 100 //只是一个猜测-应该是最大索引+ 1 

val结果= df.rdd.map(r = > {
val label = r.getAs [Int]( label)
val arr = r.getAs [mutable.WrappedArray [String]]( features)。toArray
//将每个项目解析为(索引,值)元组,以在稀疏向量
中使用val elements = arr.map(_。split(:))。map {
case Array(s,d )=>(s.replaceAll( d | _pv_1sec,)。toInt,d.toDouble)
}
LabeledPoint(label,Vectors.sparse(vectorSize,elements))
}}

result.foreach(println)
//(1.0,(100,[3,90],[0.9030899869919435,1.4471580313422192]))
//(2.0 ,(100,[7,30],[0.9030899869919435,1.414973347970818]))

注意:使用 s.replaceAll( d | _pv_1sec,)可能会有点慢,因为它会分别为每个项目编译一个正则表达式。如果是这样,可以用更快(但更难看)的 s.replace( d,).replace( _ pv_1sec,)代替不使用正则表达式。


I am new to Scala and I want to convert dataframe to rdd. let the label, features convert to RDD[labelPoint] for the input of MLlib. But I can't find out the way to deal with WrappedArray.

scala> test.printSchema
root
 |-- user_id: long (nullable = true)
 |-- brand_store_sn: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- money_score: double (nullable = true)
 |-- normal_score: double (nullable = true)
 |-- action_score: double (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- flag: string (nullable = true)
 |-- dt: string (nullable = true)


scala> test.head
res21: org.apache.spark.sql.Row = [2533,10005072,1,2.0,1.0,1.0,WrappedArray(["d90_pv_1sec:1.4471580313422192", "d3_pv_1sec:0.9030899869919435", "d7_pv_1sec:0.9030899869919435", "d30_pv_1sec:1.414973347970818", "d90_pv_week_decay:1.4235871662780681", "d1_pv_1sec:0.9030899869919435", "d120_pv_1sec:1.4471580313422192"]),user_positive,20161130]

解决方案

First - since LabeledPoint expects a Vector of Doubles, I'm assuming you also want to split each element in every features array by colon (:), and treat the right-hand side of it as the double, e.g.:

 "d90_pv_1sec:1.4471580313422192" --> 1.4471580313422192

If so - here's the transformation:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint

// sample data - DataFrame with label, features and other columns
val df = Seq(
  (1, Array("d90_pv_1sec:1.4471580313422192", "d3_pv_1sec:0.9030899869919435"), 4.0),
  (2, Array("d7_pv_1sec:0.9030899869919435", "d30_pv_1sec:1.414973347970818"), 5.0)
).toDF("label", "features", "ignored")

// extract relevant fields from Row and convert WrappedArray[String] into Vector:
val result = df.rdd.map(r => {
  val label = r.getAs[Int]("label")
  val featuresArray = r.getAs[mutable.WrappedArray[String]]("features")
  val features: Vector = Vectors.dense(
    featuresArray.map(_.split(":")(1).toDouble).toArray
  )
  LabeledPoint(label, features)
})

result.foreach(println)
// (1.0,[1.4471580313422192,0.9030899869919435])
// (2.0,[0.9030899869919435,1.414973347970818])

EDIT: per clarification, now assuming each item in the input array contains the expected index in a resulting sparse vector:

"d90_pv_1sec:1.4471580313422192" --> index = 90; value = 1.4471580313422192

The modified code would be:

val vectorSize: Int = 100 // just a guess - should be the maximum index + 1

val result = df.rdd.map(r => {
  val label = r.getAs[Int]("label")
  val arr = r.getAs[mutable.WrappedArray[String]]("features").toArray
  // parse each item into (index, value) tuple to use in sparse vector
  val elements = arr.map(_.split(":")).map {
    case Array(s, d) => (s.replaceAll("d|_pv_1sec","").toInt, d.toDouble)
  }
  LabeledPoint(label, Vectors.sparse(vectorSize, elements))
})

result.foreach(println)
// (1.0,(100,[3,90],[0.9030899869919435,1.4471580313422192]))
// (2.0,(100,[7,30],[0.9030899869919435,1.414973347970818]))

NOTE: Using s.replaceAll("d|_pv_1sec","") might be a bit slow, as it compiles a regular expression for each item separately. If that's the case, it can be replaced by the faster (yet uglier) s.replace("d", "").replace("_pv_1sec", "") which doesn't use regular expressions.

这篇关于在Scala中将Spark Dataframe(带有WrappedArray)转换为RDD [labelPoint]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆