Spark数据框:根据另一列的值提取一列 [英] Spark dataframes: Extract a column based on the value of another column
问题描述
我有一个数据框,其中包含带有价格清单的交易:
I have a dataframe with transactions with a joined price list:
+----------+----------+------+-------+-------+
| paid | currency | EUR | USD | GBP |
+----------+----------+------+-------+-------+
| 49.5 | EUR | 99 | 79 | 69 |
+----------+----------+------+-------+-------+
客户已支付49.5欧元,如货币"列中所示.现在,我想将支付的价格与价格列表中的价格进行比较.
A customer has paid 49.5 in EUR, as shown in the "currency" column. I now want to compare that paid price with the price from the price list.
因此,我需要根据"currency"的值访问正确的列,如下所示:
Therefor I need to access the correct column based on the value of "currency" like so:
df.withColumn("saved", df.col(df.col($"currency")) - df.col("paid"))
我希望会成为
df.withColumn("saved", df.col("EUR") - df.col("paid"))
但是,这失败了.我尝试了所有我能成像的东西,包括UDF,都无济于事.
This fails, however. I tried all things I could image, including and UDF, getting nowhere.
我想有一些优雅的解决方案吗?有人可以帮忙吗?
I guess there is some elegant solution for this? Can somebody help out here?
推荐答案
假定列名称与currency
列中的值匹配:
Assuming that the column names match values in the currency
column:
import org.apache.spark.sql.functions.{lit, col, coalesce}
import org.apache.spark.sql.Column
// Dummy data
val df = sc.parallelize(Seq(
(49.5, "EUR", 99, 79, 69), (100.0, "GBP", 80, 120, 50)
)).toDF("paid", "currency", "EUR", "USD", "GBP")
// A list of available currencies
val currencies: List[String] = List("EUR", "USD", "GBP")
// Select listed value
val listedPrice: Column = coalesce(
currencies.map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*)
df.select($"*", (listedPrice - $"paid").alias("difference")).show
// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5| EUR| 99| 79| 69| 49.5|
// |100.0| GBP| 80|120| 50| -50.0|
// +-----+--------+---+---+---+----------+
,与SQL等效的listedPrice
表达式如下:
with SQL equivalent of listedPrice
expression being something like this:
COALESCE(
CASE WHEN (currency = 'EUR') THEN EUR ELSE null,
CASE WHEN (currency = 'USD') THEN USD ELSE null,
CASE WHEN (currency = 'GBP') THEN GBP ELSE null
)
使用foldLeft
的替代方法:
import org.apache.spark.sql.functions.when
val listedPriceViaFold = currencies.foldLeft(
lit(null))((acc, c) => when($"currency" === c, col(c)).otherwise(acc))
df.select($"*", (listedPriceViaFold - $"paid").alias("difference")).show
// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5| EUR| 99| 79| 69| 49.5|
// |100.0| GBP| 80|120| 50| -50.0|
// +-----+--------+---+---+---+----------+
其中listedPriceViaFold
转换为以下SQL:
where listedPriceViaFold
translates to following SQL:
CASE
WHEN (currency = 'GBP') THEN GBP
ELSE CASE
WHEN (currency = 'USD') THEN USD
ELSE CASE
WHEN (currency = 'EUR') THEN EUR
ELSE null
不幸的是,我不知道有任何内置函数可以像这样直接表达SQL
Unfortunately I am not aware of any built-in functions which could express directly SQL like this
CASE currency
WHEN 'EUR' THEN EUR
WHEN 'USD' THEN USD
WHEN 'GBP' THEN GBP
ELSE null
END
但是您可以在原始SQL中使用此构造.
but you can use this construct in raw SQL.
我的假设是不正确的,您可以直接在列名称和currency
列中的值之间添加映射.
It my assumption is not true you can simply add mapping between column name and a value in the currency
column.
修改:
如果源支持谓词下推和有效的列修剪,另一种选择可能会比较有效,它是按货币和并集进行子集设置:
Another option, which could be efficient if source supports predicate pushdown and efficient column pruning, is to subset by currency and union:
currencies.map(
// for each currency filter and add difference
c => df.where($"currency" === c).withColumn("difference", $"paid" - col(c))
).reduce((df1, df2) => df1.unionAll(df2)) // Union
等效于这样的SQL:
SELECT *, EUR - paid AS difference FROM df WHERE currency = 'EUR'
UNION ALL
SELECT *, USD - paid AS difference FROM df WHERE currency = 'USD'
UNION ALL
SELECT *, GBP - paid AS difference FROM df WHERE currency = 'GBP'
这篇关于Spark数据框:根据另一列的值提取一列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!