使用Spark将列名称附加到列值 [英] Appending column name to column value using Spark

查看:68
本文介绍了使用Spark将列名称附加到列值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有用逗号分隔的文件中的数据,我已将其加载到spark数据框中:数据如下:

I have data in comma separated file, I have loaded it in the spark data frame: The data looks like:

  A B C
  1 2 3
  4 5 6
  7 8 9

我想使用pyspark将上述数据框转换为Spark:

I want to transform the above data frame in spark using pyspark as:

   A    B   C
  A_1  B_2  C_3
  A_4  B_5  C_6
  --------------

然后使用pyspark将其转换为列表列表:

Then convert it to list of list using pyspark as:

[[ A_1 , B_2 , C_3],[A_4 , B_5 , C_6]]

然后使用pyspark在上述数据集上运行FP Growth算法.

And then run FP Growth algorithm using pyspark on the above data set.

我尝试过的代码如下:

from pyspark.sql.functions import col, size
from pyspark.sql.functions import *
import pyspark.sql.functions as func
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import Row
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StringType
from pyspark import SQLContext

sqlContext = SQLContext(sc)
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/tables/data.csv")

 names=df.schema.names

然后我想到在for循环内做一些事情:

Then I thought of doing something inside for loop:

 for name in names:
      -----
      ------

在此之后,我将使用fpgrowth:

After this I will be using fpgrowth:

df = spark.createDataFrame([
    (0, [ A_1 , B_2 , C_3]),
    (1, [A_4 , B_5 , C_6]),)], ["id", "items"])

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)

推荐答案

此处为使用Scala的用户提供了许多概念,这些概念通常展示了如何使用pyspark.尽管有多少是个大问题,但还是有一些不同,但可以肯定地学到一些.我当然知道我自己使用zipWithIndex在pyspark上学到了一点.反正.

A number of concepts here for those who use Scala normally showing how to do with pyspark. Somewhat different but learnsome for sure, although to how many is the big question. I certainly learnt a point on pyspark with zipWithIndex myself. Anyway.

第一部分是将内容转换为所需的格式,可能也可以导入,但按原样保留:

First part is to get stuff into desired format, probably too may imports but leaving as is:

from functools import reduce
from pyspark.sql.functions import lower, col, lit, concat, split
from pyspark.sql.types import * 
from pyspark.sql import Row
from pyspark.sql import functions as f

source_df = spark.createDataFrame(
   [
    (1, 11, 111),
    (2, 22, 222)
   ],
   ["colA", "colB", "colC"]
                                 )

intermediate_df = (reduce(
                    lambda df, col_name: df.withColumn(col_name, concat(lit(col_name), lit("_"), col(col_name))),
                    source_df.columns,
                    source_df
                   )     )

allCols = [x for x in intermediate_df.columns]
result_df = intermediate_df.select(f.concat_ws(',', *allCols).alias('CONCAT_COLS'))

result_df = result_df.select(split(col("CONCAT_COLS"), ",\s*").alias("ARRAY_COLS"))

# Add 0,1,2,3, ... with zipWithIndex, we add it at back, but that does not matter, you can move it around.
# Get new Structure, the fields (one in this case but done flexibly, plus zipWithIndex value.
schema = StructType(result_df.schema.fields[:] + [StructField("index", LongType(), True)])

# Need this dict approach with pyspark, different to Scala.
rdd = result_df.rdd.zipWithIndex()
rdd1 = rdd.map(
               lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],)
              )

final_result_df = spark.createDataFrame(rdd1, schema)
final_result_df.show(truncate=False)

返回:

 +---------------------------+-----+
 |ARRAY_COLS                 |index|
 +---------------------------+-----+
 |[colA_1, colB_11, colC_111]|0    |
 |[colA_2, colB_22, colC_222]|1    |
 +---------------------------+-----+

第二部分是带有pyspark的旧zipWithIndex,如果您需要0,1,..与Scala相比痛苦.

Second part is the old zipWithIndex with pyspark if you need 0,1,.. Painful compared to Scala.

通常在Scala中更容易解决.

In general easier to solve in Scala.

不确定性能,而不是foldLeft,很有趣.我认为实际上可以.

Not sure on performance, not a foldLeft, interesting. I think it is OK actually.

这篇关于使用Spark将列名称附加到列值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆