带有scala的spark中数据框的列内的列名 [英] Column Name inside column of dataframe in spark with scala

查看:350
本文介绍了带有scala的spark中数据框的列内的列名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Scala中使用spark. 2.4.3

I am using spark with Scala. 2.4.3

我的销售人员数据框看起来像:它总共有 54 销售人员,我仅以3列为例

My salesperson Dataframe looks like: it has total 54 salesperson, i took example of only 3 column

Schema of SalesPerson table.
root
 |-- col: struct (nullable = false)
 |    |-- SalesPerson_1: string (nullable = true)
 |    |-- SalesPerson_2: string (nullable = true)
 |    |-- SalesPerson_3: string (nullable = true)

销售员"视图的数据.

     SalesPerson_1|SalesPerson_2|SalesPerson_3
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    [Customer_1793,  Customer_202,  Customer_2461]
    [Customer_2424, Customer_130, Customer_787]
    [Customer_1061, Customer_318, Customer_706]
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++

我的salesplace数据框看起来像

My salesplace dataframe looks like

Schema of salesplace
 
 root
 |-- Place: string (nullable = true)
 |-- Customer: string (nullable = true)

Data of salesplace
Place|Customer
Online| Customer_1793
Retail| Customer_1793
Retail| Customer_130
Online| Customer_130
Online| Customer_2461
Retail| Customer_2461
Online| Customer_2461

我正在尝试检查SalesPlace表中哪些客户可用. 有两个additional column shows customer belong to salesperson

i am trying to check which customer from Salesperson table are available in SalesPlace table. with two additional column shows customer belong to salesperson

和SalesPlace表中的客户出现次数,用于

and count of customer occurance in SalesPlace table, for

预期输出:

CustomerBelongstoSalesperson|Customer     |occurance|
SalesPerson_1               |Customer_1793|2
SalesPerson_2               |Customer_130 |2 
SalesPerson_3               |Customer_2461|3
SalesPerson_2               |Customer_202 |0
SalesPerson_1               |Customer_2424|0
SalesPerson_1               |Customer_1061|0
SalesPerson_2               |Customer_318 |0
SalesPerson_3               |Customer_787 |0

代码:

Error:
The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 54 aliases but got Salesperson,Customer ;

火花似乎并不重要. 我不确定是否可以将columnname列中的列作为值.... 可能有人请帮助我了解如何执行此操作..... 谢谢

It seems little critical in spark. i am not sure if it is possible to bring columnname inside column as value.... May someone please help me with some idea how to do this........ Thanks

推荐答案

尝试一下-

 val data1 =
      """
        |salesperson1          |  salesperson2
        |Customer_17         |Customer_202
        |Customer_24         |Customer_130
      """.stripMargin
    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()
    /**
      * +------------+------------+
      * |salesperson1|salesperson2|
      * +------------+------------+
      * |Customer_17 |Customer_202|
      * |Customer_24 |Customer_130|
      * +------------+------------+
      *
      * root
      * |-- salesperson1: string (nullable = true)
      * |-- salesperson2: string (nullable = true)
      */

    val data2 =
      """
        |Place  |Customer
        |shop  |Customer_17
        |Home  |Customer_17
        |shop  |Customer_17
        |Home  |Customer_130
        |Shop  |Customer_202
      """.stripMargin
    val stringDS2 = data2.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS2)
    df2.show(false)
    df2.printSchema()
    /**
      * +-----+------------+
      * |Place|Customer    |
      * +-----+------------+
      * |shop |Customer_17 |
      * |Home |Customer_17 |
      * |shop |Customer_17 |
      * |Home |Customer_130|
      * |Shop |Customer_202|
      * +-----+------------+
      *
      * root
      * |-- Place: string (nullable = true)
      * |-- Customer: string (nullable = true)
      */

Unpivotleft join

  val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
    val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
    processedDF.show(false)
    /**
      * +------------+------------+
      * |Salesperson |Customer    |
      * +------------+------------+
      * |salesperson1|Customer_17 |
      * |salesperson2|Customer_202|
      * |salesperson1|Customer_24 |
      * |salesperson2|Customer_130|
      * +------------+------------+
      */

    processedDF.join(df2, Seq("Customer"), "left")
      .groupBy("Customer")
      .agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
      .show(false)

    /**
      * +------------+---------+------------+
      * |Customer    |Occurance|Salesperson |
      * +------------+---------+------------+
      * |Customer_130|1        |salesperson2|
      * |Customer_17 |3        |salesperson1|
      * |Customer_202|1        |salesperson2|
      * |Customer_24 |0        |salesperson1|
      * +------------+---------+------------+
      */

这篇关于带有scala的spark中数据框的列内的列名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆