Spark 数据帧:根据另一列的值提取一列 [英] Spark dataframes: Extract a column based on the value of another column

查看:44
本文介绍了Spark 数据帧:根据另一列的值提取一列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含价格表的交易数据框:

I have a dataframe with transactions with a joined price list:

+----------+----------+------+-------+-------+
|   paid   | currency | EUR  |  USD  |  GBP  |
+----------+----------+------+-------+-------+
|   49.5   |   EUR    | 99   |  79   |  69   |
+----------+----------+------+-------+-------+

客户已支付 49.5 欧元,如货币"列中所示.我现在想将支付的价格与价目表中的价格进行比较.

A customer has paid 49.5 in EUR, as shown in the "currency" column. I now want to compare that paid price with the price from the price list.

因此,我需要根据货币"的值访问正确的列,如下所示:

Therefor I need to access the correct column based on the value of "currency" like so:

df.withColumn("saved", df.col(df.col($"currency")) - df.col("paid"))

我希望成为的

df.withColumn("saved", df.col("EUR") - df.col("paid"))

然而,这失败了.我尝试了所有可以想象的东西,包括 UDF,但一无所获.

This fails, however. I tried all things I could image, including and UDF, getting nowhere.

我想对此有一些优雅的解决方案吗?有人可以帮忙吗?

I guess there is some elegant solution for this? Can somebody help out here?

推荐答案

假设列名与 currency 列中的值匹配:

Assuming that the column names match values in the currency column:

import org.apache.spark.sql.functions.{lit, col, coalesce}
import org.apache.spark.sql.Column 

// Dummy data
val df = sc.parallelize(Seq(
  (49.5, "EUR", 99, 79, 69), (100.0, "GBP", 80, 120, 50)
)).toDF("paid", "currency", "EUR", "USD", "GBP")

// A list of available currencies 
val currencies: List[String] = List("EUR", "USD", "GBP")

// Select listed value
val listedPrice: Column = coalesce(
  currencies.map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*)

df.select($"*", (listedPrice - $"paid").alias("difference")).show

// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5|     EUR| 99| 79| 69|      49.5|
// |100.0|     GBP| 80|120| 50|     -50.0|
// +-----+--------+---+---+---+----------+

listedPrice 表达式的 SQL 等价物是这样的:

with SQL equivalent of listedPrice expression being something like this:

COALESCE(
  CASE WHEN (currency = 'EUR') THEN EUR ELSE null,
  CASE WHEN (currency = 'USD') THEN USD ELSE null,
  CASE WHEN (currency = 'GBP') THEN GBP ELSE null
)

使用 foldLeft 的替代方法:

import org.apache.spark.sql.functions.when

val listedPriceViaFold = currencies.foldLeft(
  lit(null))((acc, c) => when($"currency" === c, col(c)).otherwise(acc))

df.select($"*", (listedPriceViaFold - $"paid").alias("difference")).show

// +-----+--------+---+---+---+----------+
// | paid|currency|EUR|USD|GBP|difference|
// +-----+--------+---+---+---+----------+
// | 49.5|     EUR| 99| 79| 69|      49.5|
// |100.0|     GBP| 80|120| 50|     -50.0|
// +-----+--------+---+---+---+----------+

其中 listedPriceViaFold 转换为以下 SQL:

where listedPriceViaFold translates to following SQL:

CASE
  WHEN (currency = 'GBP') THEN GBP
  ELSE CASE
    WHEN (currency = 'USD') THEN USD
    ELSE CASE
      WHEN (currency = 'EUR') THEN EUR
      ELSE null

不幸的是,我不知道任何可以像这样直接表达 SQL 的内置函数

Unfortunately I am not aware of any built-in functions which could express directly SQL like this

CASE currency
    WHEN 'EUR' THEN EUR
    WHEN 'USD' THEN USD
    WHEN 'GBP' THEN GBP
    ELSE null
END

但是您可以在原始 SQL 中使用此构造.

but you can use this construct in raw SQL.

我的假设不正确,您可以简单地添加列名和 currency 列中的值之间的映射.

It my assumption is not true you can simply add mapping between column name and a value in the currency column.

编辑:

如果源支持谓词下推和高效的列修剪,另一个可能是有效的选择是按货币和联合进行子集:

Another option, which could be efficient if source supports predicate pushdown and efficient column pruning, is to subset by currency and union:

currencies.map(
  // for each currency filter and add difference
  c => df.where($"currency" === c).withColumn("difference", $"paid" - col(c))
).reduce((df1, df2) => df1.unionAll(df2)) // Union

它相当于这样的SQL:

It is equivalent to SQL like this:

SELECT *,  EUR - paid AS difference FROM df WHERE currency = 'EUR'
UNION ALL
SELECT *,  USD - paid AS difference FROM df WHERE currency = 'USD'
UNION ALL
SELECT *,  GBP - paid AS difference FROM df WHERE currency = 'GBP'

这篇关于Spark 数据帧:根据另一列的值提取一列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆