Spark Scala - 如何组合数据帧行并将复杂函数应用于组? [英] Spark Scala - How to group dataframe rows and apply complex function to the groups?
问题描述
我正在努力解决这个超级简单的问题,我已经厌恶了,我希望有人可以帮助我。我有一个这样的数据框:
i am trying to solve this super simple problem and i am already sick of it, I hope somebody can help my out with this. I have a dataframe of shape like this:
---------------------------
| Category | Product_ID |
|------------+------------+
| a | product 1 |
| a | product 2 |
| a | product 3 |
| a | product 1 |
| a | product 4 |
| b | product 5 |
| b | product 6 |
---------------------------
我如何按类别对这些行进行分组,并在Scala中应用复杂的函数?
可能是这样的:
How do i group these rows by category and apply complicated function in Scala? Maybe something like this:
val result = df.groupBy("Category").apply(myComplexFunction)
此myComplexFunction应为每个类别生成以下表格,并将其上传到Hive表中的成对相似性,或将其保存为HDFS:
This myComplexFunction should produce the following table for each category and upload to pairwise similarities into Hive table or save it into HDFS :
+--------------------------------------------------+
| | Product_1 | Product_2 | Product_3 |
+------------+------------+------------------------+
| Product_1 | 1.0 | 0.1 | 0.8 |
| Product_2 | 0.1 | 1.0 | 0.5 |
| Product_3 | 0.8 | 0.5 | 1.0 |
+--------------------------------------------------+
这是我要应用的功能(它只是在每个类别中计算项目余弦相似度) :
Here is the function i want to apply (it is just computing item-item cosine similarity within each category):
def myComplexFunction(context_data : DataFrame, country_name: String,
context_id: String, table_name_correlations: String,
context_layer: String, context_index: String) : Boolean = {
val unique_identifier = country_name + "_" + context_layer + "_" + context_index
val temp_table_vocabulary = "temp_vocabulary_" + unique_identifier
val temp_table_similarities = "temp_similarities_" + unique_identifier
val temp_table_correlations = "temp_correlations_" + unique_identifier
//context.count()
// fit a CountVectorizerModel from the corpus
//println("Creating sparse incidence matrix")
val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("words").setOutputCol("features").fit(context_data)
val incidence = cvModel.transform(context_data)
// ========================================================================================
// create dataframe of mapping from indices into the item id
//println("Creating vocabulary")
val vocabulary_rdd = sc.parallelize(cvModel.vocabulary)
val rows_vocabulary_rdd = vocabulary_rdd.zipWithIndex.map{ case (s,i) => Row(s,i)}
val vocabulary_field1 = StructField("Product_ID", StringType, true)
val vocabulary_field2 = StructField("Product_Index", LongType, true)
val schema_vocabulary = StructType(Seq(vocabulary_field1, vocabulary_field2))
val df_vocabulary = hiveContext.createDataFrame(rows_vocabulary_rdd, schema_vocabulary)
// ========================================================================================
//println("Computing similarity matrix")
val myvectors = incidence.select("features").rdd.map(r => r(0).asInstanceOf[Vector])
val mat: RowMatrix = new RowMatrix(myvectors)
val sims = mat.columnSimilarities(0.0)
// ========================================================================================
// Convert records of the Matrix Entry RDD into Rows
//println("Extracting paired similarities")
val rowRdd = sims.entries.map{case MatrixEntry(i, j, v) => Row(i, j, v)}
// ========================================================================================
// create dataframe schema
//println("Creating similarity dataframe")
val field1 = StructField("Product_Index", LongType, true)
val field2 = StructField("Neighbor_Index", LongType, true)
var field3 = StructField("Similarity_Score", DoubleType, true)
val schema_similarities = StructType(Seq(field1, field2, field3))
// create the dataframe
val df_similarities = hiveContext.createDataFrame(rowRdd, schema_similarities)
// ========================================================================================
//println("Register vocabulary and correlations as spark temp tables")
df_vocabulary.registerTempTable(temp_table_vocabulary)
df_similarities.registerTempTable(temp_table_similarities)
// ========================================================================================
//println("Extracting Product_ID")
val temp_corrs = hiveContext.sql(
s"SELECT T1.Product_ID, T2.Neighbor_ID, T1.Similarity_Score " +
s"FROM " +
s"(SELECT Product_ID, Neighbor_Index, Similarity_Score " +
s"FROM $temp_table_similarities LEFT JOIN $temp_table_vocabulary " +
s"WHERE $temp_table_similarities.Product_Index = $temp_table_vocabulary.Product_Index) AS T1 " +
s"LEFT JOIN " +
s"(SELECT Product_ID AS Neighbor_ID, Product_Index as Neighbor_Index FROM $temp_table_vocabulary) AS T2 " +
s"ON " +
s"T1.Neighbor_Index = T2.Neighbor_Index")
// ========================================================================================
val context_corrs = temp_corrs.withColumn("Context_Layer", lit(context_layer)).withColumn("Context_ID", lit(context_id)).withColumn("Country", lit(country_name))
context_corrs.registerTempTable(temp_table_correlations)
// ========================================================================================
hiveContext.sql(s"INSERT INTO TABLE $table_name_correlations SELECT * FROM $temp_table_correlations")
// ========================================================================================
// clean up environment
//println("Cleaning up temp tables")
hiveContext.dropTempTable(temp_table_correlations)
hiveContext.dropTempTable(temp_table_similarities)
hiveContext.dropTempTable(temp_table_vocabulary)
return true
}
val partitioned = tokenized.repartition(tokenized("context_id"))
val context_counts = partitioned.mapPartitions()
//val context_counts = model_code_ids.zipWithIndex.map{case (model_code_id, context_index) => compute_similarity(tokenized.filter(tokenized("context_id") === model_code_id), country_name, model_code_id.asInstanceOf[String], table_name_correlations, context_layer, context_index.toString)}
}
我已经尝试过:
val category_ids = df.select("Category").distinct.collect()
val result = category_ids.map(category_id => myComplexFunction(df.filter(df("Category") <=> category_id)))
我不知道为什么这种方法顺序而不是并行运行。
I don't know why but this approach runs sequentially and not in parallel.
推荐答案
余弦相似度不是一个复杂的函数,可以使用标准SQL聚合表达。让我们考虑以下示例:
Cosine similarity is not a complex function and can expressed using standard SQL aggregations. Let's consider following example:
val df = Seq(
("feat1", 1.0, "item1"),
("feat2", 1.0, "item1"),
("feat6", 1.0, "item1"),
("feat1", 1.0, "item2"),
("feat3", 1.0, "item2"),
("feat4", 1.0, "item3"),
("feat5", 1.0, "item3"),
("feat1", 1.0, "item4"),
("feat6", 1.0, "item4")
).toDF("feature", "value", "item")
其中功能
是一个功能标识符 value
是相应的值,项目
是对象标识符和功能
,项目
对只有一个对应的值。
where feature
is a feature identifier, value
is a corresponding value and item
is object identifier and feature
, item
pair has only one corresponding value.
余弦相似性定义为:
其中分子可以计算为:
val numer = df.as("this").withColumnRenamed("item", "this")
.join(df.as("other").withColumnRenamed("item", "other"), Seq("feature"))
.where($"this" < $"other")
.groupBy($"this", $"other")
.agg(sum($"this.value" * $"other.value").alias("dot"))
在分母中使用的规范为:
and norms used in the denominator as:
import org.apache.spark.sql.functions.sqrt
val norms = df.groupBy($"item").agg(sqrt(sum($"value" * $"value")).alias("norm"))
//组合在一起:
val cosine = ($"dot" / ($"this_norm.norm" * $"other_norm.norm")).as("cosine")
val similarities = numer
.join(norms.alias("this_norm").withColumnRenamed("item", "this"), Seq("this"))
.join(norms.alias("other_norm").withColumnRenamed("item", "other"), Seq("other"))
.select($"this", $"other", cosine)
,结果表示忽略对角线的上三角矩阵的非零项(这是微不足道的):
with result representing non-zero entries of the upper triangular matrix ignoring diagonal (which is trivial):
+-----+-----+-------------------+
| this|other| cosine|
+-----+-----+-------------------+
|item1|item4| 0.8164965809277259|
|item1|item2|0.40824829046386296|
|item2|item4| 0.4999999999999999|
+-----+-----+-------------------+
这应该等同于:
import org.apache.spark.sql.functions.array
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.Vectors
val pivoted = df.groupBy("item").pivot("feature").sum()
.na.fill(0.0)
.orderBy("item")
val mat = new IndexedRowMatrix(pivoted
.select(array(pivoted.columns.tail.map(col): _*))
.rdd
.zipWithIndex
.map {
case (row, idx) =>
new IndexedRow(idx, Vectors.dense(row.getSeq[Double](0).toArray))
})
mat.toCoordinateMatrix.transpose
.toIndexedRowMatrix.columnSimilarities
.toBlockMatrix.toLocalMatrix
0.0 0.408248290463863 0.0 0.816496580927726
0.0 0.0 0.0 0.4999999999999999
0.0 0.0 0.0 0.0
0.0 0.0 0.0 0.0
关于你的代码:
- 执行是顺序的,因为您的代码在本地(
收集的
)集合上运行。 -
myComplexFunction
不能进一步分发,因为它是分布式数据结构和上下文。
- Execution is sequential because your code operates on local (
collected
) collection. myComplexFunction
cannot be further distributed because it is distributed data structures and contexts.
这篇关于Spark Scala - 如何组合数据帧行并将复杂函数应用于组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!