从龙重塑星火数据帧到宽大型数据集 [英] Reshape Spark DataFrame from Long to Wide On Large Data Sets
问题描述
我想从长期重塑我的数据帧的广泛使用星火数据帧API。该数据集是从学生的问卷问题和答案集合。这是一个巨大的数据集和Q(问题)和A(回答)大约为1至50000。我想收集所有可能的对Q * A和使用它们来构建列。如果一个学生回答1问题1,我们分配一个值1列1_1。否则,我们给它一个0的数据集已在S_ID,Q,A
消除重复在R,我可以简单地使用dcast库reshape2,但我不知道如何使用星火做到这一点。我已经找到了解决方案,在下面的链接转动,但它需要独特的双Q * A的一个固定数。
<一href=\"http://rajasoftware.net/index.php/database/91446/scala-apache-spark-pivot-dataframes-pivot-spark-dataframe\" rel=\"nofollow\">http://rajasoftware.net/index.php/database/91446/scala-apache-spark-pivot-dataframes-pivot-spark-dataframe
我也试过串联Q和A使用用户定义的函数,并将其应用交叉不过,我从控制台下面的错误,即使到目前为止,我只测试我的code对样本数据文件 -
le6对的最大限制已收集,这可能不是所有的对。
请尽量减少你的专栏不同项目的数量。
原始数据:
S_ID,Q,A结果
1,1,1搜索
1,2,2搜索
1,3,3结果
2,1,1搜索
2,2,3结果
2,3,4搜索
2,4,5
块引用>=>长到宽改造后:
S_ID,QA_1_1,QA_2_2,QA_3_3,QA_2_3,QA_3_4,QA_4_5结果
1,1,1,1,0,0,0结果
2,1,0,0,1,1,1
块引用>R $ C $℃。
库(dplyr);库(reshape2);
DF1&LT; - DF%GT;%GROUP_BY(S_ID,Q,A)%GT;%滤波器(ROW_NUMBER()== 1)%GT;%发生变异(TEMP = 1)
DF1%GT;%dcast(S_ID〜Q + A,value.var =温度,填写= 0)星火code。
VAL fnConcatenate = UDF((X:字符串,Y:字符串)=&GT; {QA _+ X +_+ Y})
DF1 = df.distinct.withColumn(QA,fnConcatenate($Q,$A))
DF2 = stat.crosstab(S_ID,质量检查)任何想法都会AP preciated。
解决方案你正在尝试做的,是通过设计故障的原因有两个:
当你有一个正确的列存储时,你可以实现的东西高效COM pression或聚合
- 您更换稀疏数据与密集的一体。它是昂贵的既当涉及到内存需求和计算,这是几乎从来没有一个好主意,当你有一个大的数据集
- 您限制在本地处理数据的能力。简化的东西一点点星火数据帧就在
A包装RDD [行]
。这意味着更大的行越少,你可以对单个分区,并在性的操作的地方,如聚合更为昂贵,并且需要更多的网络流量。宽表是有用的。从实用的角度来看几乎所有你可以用宽表做可以用一个漫长的使用组/窗函数来完成。
有一件事你可以尝试是使用稀疏向量来创建样宽幅:
进口org.apache.spark.sql.Row
进口org.apache.spark.sql.functions.max
进口org.apache.spark.mllib.linalg.Vectors
进口org.apache.spark.ml.feature.StringIndexer
进口sqlContext.implicits._df.registerTempTable(DF)
VAL dfComb = sqlContext.sql(SELECT S_ID,CONCAT(Q,'\\ T',A)AS QA FROM DF)VAL索引=新StringIndexer()
.setInputCol(QA)
.setOutputCol(IDX)
.fit(dfComb)VAL指数= indexer.transform(dfComb)VAL N = indexed.agg(MAX(IDX))。first.getDouble(0).toInt + 1VAL wideLikeDF =索引
。选择($S_ID$IDX)
.rdd
.MAP {情况下排(S_ID:字符串,IDX:双人间)=&GT; (S_ID,idx.toInt)}
.groupByKey //这是假定没有重复
.mapValues(瓦尔斯=方式&gt; Vectors.sparse(N,vals.map((_,1.0))的toArray))
.toDF(ID,qaVec)这里
酷的部分是你可以很容易地将其转换为
IndexedRowMatrix
键,例如计算SVDVAL垫=新IndexedRowMatrix(wideLikeDF.map {
//这里我们假设S_ID可以直接映射到龙
//如果没有它必须被索引
案例行(ID:字符串,qaVec:SparseVector)=&GT; IndexedRow(id.toLong,qaVec)
})VAL SVD = mat.computeSVD(3)或
RowMatrix
并获得列统计或计算主成分:VAL colStats = mat.toRowMatrix.computeColumnSummaryStatistic
VAL colSims = mat.toRowMatrix.columnSimilarities
VAL PC = mat.toRowMatrix.computePrincipalComponents(3)修改
在星火1.6.0+您可以使用
透视
功能。I am trying to reshape my dataframe from long to wide using Spark DataFrame API. The data set is the collection of questions and answers from student's questionary. It's a huge data set and the Q(Question) and A(Answer) approximately range from 1 to 50000. I would like to collect all the possible pairs of Q*A and use them to build columns. If a student answered 1 to Question 1, we assign a value 1 to column 1_1. Otherwise, we give it a 0. The data set has been de-duplicated on S_ID, Q, A.
In R, I can simply use dcast in the library reshape2 but I don't know how to do it using Spark. I have found the solution to pivot in the below link but it required a fix number of distinct pairs of Q*A. http://rajasoftware.net/index.php/database/91446/scala-apache-spark-pivot-dataframes-pivot-spark-dataframe
I also tried concatenating Q and A using user-defined function and them apply crosstab However, I got the below error from the console even though so far I only test my code on a sample data file-
The maximum limit of le6 pairs have been collected, which may not be all of the pairs. Please try reducing the amount of distinct items in your columns.
Original Data:
S_ID, Q, A
1, 1, 1
1, 2, 2
1, 3, 3
2, 1, 1
2, 2, 3
2, 3, 4
2, 4, 5=> After long-to-wide transformation:
S_ID, QA_1_1, QA_2_2, QA_3_3, QA_2_3, QA_3_4, QA_4_5
1, 1, 1, 1, 0, 0, 0
2, 1, 0, 0, 1, 1, 1
R code. library(dplyr); library(reshape2); df1 <- df %>% group_by(S_ID, Q, A) %>% filter(row_number()==1) %>% mutate(temp=1) df1 %>% dcast(S_ID ~ Q + A, value.var="temp", fill=0) Spark code. val fnConcatenate = udf((x: String, y: String) => {"QA_"+ x +"_" + y}) df1 = df.distinct.withColumn("QA", fnConcatenate($"Q", $"A")) df2 = stat.crosstab("S_ID", "QA")
Any thought would be appreciated.
解决方案What you are trying to do here is faulty by design for two reasons:
- You replace sparse data set with a dense one. It is expensive both when it comes to memory requirements and computations and it is almost never a good idea when you have a large dataset
- You limit ability to process data locally. Simplifying things a little bit Spark data frames are just a wrappers around
RDD[Row]
. It means that larger the row the less you can place on a single partition and in consequence operations like aggregations are much more expensive and require more network traffic.Wide tables are useful when you have a proper columnar storage when you can implement things like efficient compression or aggregations. From the practical perspective almost everything you can do with wide table can be done with a long one using group / window functions.
One thing you can try is to use sparse vector to create wide-like format:
import org.apache.spark.sql.Row import org.apache.spark.sql.functions.max import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.ml.feature.StringIndexer import sqlContext.implicits._ df.registerTempTable("df") val dfComb = sqlContext.sql("SELECT s_id, CONCAT(Q, '\t', A) AS qa FROM df") val indexer = new StringIndexer() .setInputCol("qa") .setOutputCol("idx") .fit(dfComb) val indexed = indexer.transform(dfComb) val n = indexed.agg(max("idx")).first.getDouble(0).toInt + 1 val wideLikeDF = indexed .select($"s_id", $"idx") .rdd .map{case Row(s_id: String, idx: Double) => (s_id, idx.toInt)} .groupByKey // This assumes no duplicates .mapValues(vals => Vectors.sparse(n, vals.map((_, 1.0)).toArray)) .toDF("id", "qaVec")
Cool part here is you can easily convert it to
IndexedRowMatrix
and for example compute SVDval mat = new IndexedRowMatrix(wideLikeDF.map{ // Here we assume that s_id can be mapped directly to Long // If not it has to be indexed case Row(id: String, qaVec: SparseVector) => IndexedRow(id.toLong, qaVec) }) val svd = mat.computeSVD(3)
or to
RowMatrix
and get column statistics or compute Principal Components:val colStats = mat.toRowMatrix.computeColumnSummaryStatistic val colSims = mat.toRowMatrix.columnSimilarities val pc = mat.toRowMatrix.computePrincipalComponents(3)
Edit:
In Spark 1.6.0+ you can use
pivot
function.这篇关于从龙重塑星火数据帧到宽大型数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!