Spark数据框-使用Scala用每一行的列值替换公共字符串的标记 [英] Spark dataframe - Replace tokens of a common string with column values for each row using scala

查看:236
本文介绍了Spark数据框-使用Scala用每一行的列值替换公共字符串的标记的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个3列的数据框-数字(整数),名称(字符串),颜色(字符串).以下是带有重新分区选项的df.show的结果.

I have a dataframe with 3 columns - number (Integer), Name (String), Color (String). Below is the result of df.show with repartition option.

val df = sparkSession.read.format("csv").option("header", "true").option("inferschema", "true").option("delimiter", ",").option("decoding", "utf8").load(fileName).repartition(5).toDF()

+------+------+------+
|Number|  Name| Color|
+------+------+------+
|     4|Orange|Orange|
|     3| Apple| Green|
|     1| Apple|   Red|
|     2|Banana|Yellow|
|     5| Apple|   Red|
+------+------+------+

我的目标是通过替换常见动态字符串中的标记(使用参数将其作为参数传递给该方法)来创建与每一行相对应的字符串列表 例如: commonDynamicString = Column.Name和Column.Color颜色

My objective is to create list of strings corresponding to each row by replacing the tokens in common dynamic string which I am passing as parameter to the method with the column values For example: commonDynamicString = Column.Name with Column.Color color

在此字符串中,我的标记是 Column.Name Column.Color .我需要用该列中的相应值替换所有行的这些值.注意:此字符串可以动态更改,因此硬编码将无法工作.

In this string, my tokens are Column.Name and Column.Color. I need to replace these values for all the rows with respective values in that column. Note: this string can change dynamically hence hardcoding won’t work.

不想使用RDD ,除非数据框没有其他选项可用.

I don't want to use RDD unless no other option is available with dataframe.

下面是我尝试过但无法实现目标的方法.

Below are the approaches I tried but couldn't achieve my objective.

选项1:

val a = df.foreach(t => {
 finalValue = commonString.replace("Column.Number", t.getAs[Any]("Number").toString())
          .replace("DF.Name", t.getAs("Name"))
          .replace("DF.Color", t.getAs("Color"))

          println ("finalValue: " +finalValue)
          })

使用这种方法,可以按预期方式打印finalValue.但是,我无法创建列表缓冲区或将最终字符串作为列表从此处传递给其他函数,因为foreach返回Unit 并引发错误.

With this approach, the finalValue prints as expected. However, I cannot create a listbuffer or pass the final string from here as a list to other function as foreach returns Unit and spark throws error.

选项2:我正在考虑此选项,但需要一些指导以了解是否可以使用foldleft或window或任何其他spark函数来创建名为"Final"的第4列 使用withColumn选项并使用UDF,在其中我可以使用正则表达式模式匹配-"Column.\ w +"提取所有标记,并对标记进行替换操作?

Option 2: I am thinking about this option but would need some guidance to understand if foldleft or window or any other spark functions can be used to create a 4th column called "Final" using withColumn option and use a UDF where I can extract all the tokens using regex pattern matching - "Column.\w+" and do replace operation for the tokens?

+------+------+------+--------------------------+
|Number|  Name| Color|      Final               |
+------+------+------+--------------------------+
|     4|Orange|Orange|Orange with orange color  |
|     3| Apple| Green|Apple with Green color    |
|     1| Apple|   Red|Apple with Red color      |
|     2|Banana|Yellow|Banana with Yellow color  |
|     5| Apple|   Red|Apple with Red color      |
+------+------+------+--------------------------+

有人可以帮助我解决这个问题,还可以让我知道我是否在正确的方向考虑使用spark处理大型数据集?

Can someone help me with this problem and also to let me know if I am thinking in the right direction to use spark for handling large datasets?

谢谢!

推荐答案

如果我正确理解您的要求,则可以创建一个列方法,例如parseStatement,该方法采用String类型的语句并返回,并执行以下步骤:

If I understand your requirement correctly, you can create a column method, say, parseStatement which takes a String-type statement and returns a Column with the following steps:

  1. 解析输入statement以计算令牌数
  2. ^(.*?)(token1)(.*?)(token2) ... (.*?)$
  3. 的形式生成正则表达式模式
  4. 应用模式匹配来组装由lit(g1),col(g2),lit(g3),col(g4),...组成的colList,其中 s是提取的正则表达式组
  1. Parse the input statement to count number of tokens
  2. Generate a Regex pattern in the form of ^(.*?)(token1)(.*?)(token2) ... (.*?)$
  3. Apply pattern matching to assemble a colList consisting of lit(g1), col(g2), lit(g3), col(g4), ..., where the g?s are the extracted Regex groups
  4. Concatenate the Column-type items

这是示例代码:

import spark.implicits._
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._

def parseStatement(stmt: String): Column = {
  val token = "Column."
  val tokenPattern = """Column\.(\w+)"""
  val literalPattern = "(.*?)"
  val colCount = stmt.sliding(token.length).count(_ == token)

  val pattern = (0 to colCount * 2).map{
    case i if (i % 2 == 0) => literalPattern
    case _ => tokenPattern
  }.mkString

  val colList = ("^" + pattern + "$").r.findAllIn(stmt).
    matchData.toList.flatMap(_.subgroups).
    zipWithIndex.map{
      case (g, i) if (i % 2 == 0) => lit(g)
      case (g, i) => col(g)
  }

  concat(colList: _*)
}

val df = Seq(
  (4, "Orange", "Orange"),
  (3, "Apple", "Green"),
  (1, "Apple", "Red"),
  (2, "Banana", "Yellow"),
  (5, "Apple", "Red")
).toDF("Number", "Name", "Color")

val statement = "Column.Name with Column.Color color"

df.withColumn("Final", parseStatement(statement)).
  show(false)
// +------+------+------+------------------------+
// |Number|Name  |Color |Final                   |
// +------+------+------+------------------------+
// |4     |Orange|Orange|Orange with Orange color|
// |3     |Apple |Green |Apple with Green color  |
// |1     |Apple |Red   |Apple with Red color    |
// |2     |Banana|Yellow|Banana with Yellow color|
// |5     |Apple |Red   |Apple with Red color    |
// +------+------+------+------------------------+

请注意,concat采用列类型参数,因此需要

Note that concat takes column-type parameters, hence the need of col() for column values and lit() for literals.

这篇关于Spark数据框-使用Scala用每一行的列值替换公共字符串的标记的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆