加入操作之前如何转换DataFrame? [英] How to transform DataFrame before joining operation?

查看:149
本文介绍了加入操作之前如何转换DataFrame?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下代码用于从products列中提取等级.等级是每对[...]中的第二个数字.例如,在给定的示例[[222,66],[333,55]]中,具有PK 222333的产品的排名分别为6655. 但是,当df_products约为800 Mb时,Spark 2.2中的代码将非常缓慢地工作:

The following code is used to extract ranks from the column products. The ranks are second numbers in each pair [...]. For example, in the given example [[222,66],[333,55]] the ranks are 66 and 55 for products with PK 222 and 333, accordingly. But the code in Spark 2.2 works very slowly when df_products is around 800 Mb:

df_products.createOrReplaceTempView("df_products")

val result = df.as("df2")
               .join(spark.sql("SELECT * FROM df_products")
               .select($"product_PK", explode($"products").as("products"))
               .withColumnRenamed("product_PK","product_PK_temp").as("df1"),$"df2.product               _PK" === $"df1.product_PK_temp" and $"df2.rec_product_PK" === $"df1.products.product_PK", "left")
               .drop($"df1.product_PK_temp")
               .select($"product_PK", $"rec_product_PK", coalesce($"df1.products.col2", lit(0.0)).as("rank_product"))

这是df_productsdf的一个小样本:

df_products =

df_products =

+----------+--------------------+
|product_PK|            products|
+----------+--------------------+
|       111|[[222,66],[333,55...|
|       222|[[333,24],[444,77...|
...
+----------+--------------------+

df =

+----------+-----------------+                 
|product_PK|   rec_product_PK|
+----------+-----------------+
|       111|              222|
|       222|              888|
+----------+-----------------+

products的每一行中的数组包含少量元素时,上述代码可以很好地工作.但是,当每行[[..],[..],...]的数组中有很多元素时,该代码似乎会卡住并且不会前进.

The above-given code works well when the arrays in each row of products contain a small number of elements. But when there are a lot of elements in the arrays of each row [[..],[..],...], then the code seems to get stuck and it does not advance.

如何优化代码?非常感谢您的任何帮助.

How can I optimize the code? Any help is really highly appreciated.

例如,是否可以在加入之前将df_products转换为以下DataFrame?

Is it possible, for example, to transform df_products into the following DataFrame before joining?

df_products =

df_products =

+----------+--------------------+------+
|product_PK|      rec_product_PK|  rank|
+----------+--------------------+------+
|       111|                 222|    66|
|       111|                 333|    55|
|       222|                 333|    24|
|       222|                 444|    77|
...
+----------+--------------------+------+

推荐答案

根据我的回答在这里,您可以进行转换df_products使用类似这样的内容:

As per my answer here, you can transform df_products using something like this:

import org.apache.spark.sql.functions.explode
df1 = df.withColumn("array_elem", explode(df("products"))
df2 = df1.select("product_PK", "array_elem.*")

这假设product是一个结构数组.如果product是一个数组数组,则可以改用以下内容:

This assumes products is an array of structs. If products is an array of array, you can use the following instead:

df2 = df1.withColumn("rank", df2("products").getItem(1))

这篇关于加入操作之前如何转换DataFrame?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆