将PySpark RDD作为新列添加到pyspark.sql.dataframe [英] Add PySpark RDD as new column to pyspark.sql.dataframe

查看:386
本文介绍了将PySpark RDD作为新列添加到pyspark.sql.dataframe的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个pyspark.sql.dataframe,其中每一行都是新闻文章.然后,我有一个RDD,代表每篇文章中包含的单词.我想将单词的RDD作为名为单词"的列添加到新文章的数据框中.我尝试过

I have a pyspark.sql.dataframe where each row is a news article. I then have a RDD that represents the words contained in each article. I want to add the RDD of words as a column named 'words' to my dataframe of new articles. I tried

df.withColumn('words', words_rdd )

但是我得到了错误

AssertionError: col should be Column

DataFrame看起来像这样

The DataFrame looks something like this

Articles
the cat and dog ran
we went to the park
today it will rain

但是我有3k条新闻.

我应用了一个函数来清除文本,例如删除停用词,并且我有一个如下所示的RDD:

I applied a function to clean the text such as remove stop words and I have a RDD that looks like this:

[[cat, dog, ran],[we, went, park],[today, will, rain]]

我正试图让我的数据框看起来像这样:

I'm trying to get my Dataframe to look like this:

Articles                 Words
the cat and dog ran      [cat, dog, ran]
we went to the park      [we, went, park]
today it will rain       [today, will, rain]

推荐答案

免责声明:

火花DataFrame通常没有严格定义的顺序.使用风险自负.

Spark DataFrame in general has no strictly defined order. Use at your own risk.

将索引添加到现有的DataFrame:

from pyspark.sql.types import *

df_index = spark.createDataFrame(
    df.rdd.zipWithIndex(),
    StructType([StructField("data", df.schema), StructField("id", LongType())])
)

将索引添加到RDD并转换为DataFrame:

Add index to RDD and convert to DataFrame:

words_df = spark.createDataFrame(
    words_rdd.zipWithIndex(),
    StructType([
        StructField("words", ArrayType(StringType())),
        StructField("id", LongType())
    ])
)

同时加入并选择必填字段:

Join both and select required fields:

df_index.join(words_df, "id").select("data.*", "words")

警告

有不同的解决方案,它们可能在特定情况下有效,但不能保证性能和/或正确性.这些包括:

There are different solutions, which might work in specific cases, but don't guarantee performance and or correctness. These include:

  • 使用monotonically_increasing_id作为join键-通常情况下不正确.
  • 使用row_number()窗口函数作为连接键-令人无法接受的性能暗示,并且如果未定义特定顺序,则通常不正确.
  • RDDs上使用zip-可以并且仅当两个结构具有相同的数据分布时才可以工作(在这种情况下应该可以工作).
  • Using monotonically_increasing_id as a join key - in general case not correct.
  • Using row_number() window function as a join key - unacceptable performance implication and in general not correct if there is no specific order defined.
  • Using zip on RDDs - can work if and only if both structures have the same data distribution (should work in this case).

注意:

在这种情况下,您不需要RDD. pyspark.ml.feature提供了多种Transformers,它们应该很适合您.

In this specific case you shouldn't need RDD. pyspark.ml.feature provides a variety of Transformers, which should work well for you.

from pyspark.ml.feature import *
from pyspark.ml import Pipeline

df = spark.createDataFrame(
     ["the cat and dog ran", "we went to the park", "today it will rain"],
         "string"
).toDF("Articles")

Pipeline(stages=[
    RegexTokenizer(inputCol="Articles", outputCol="Tokens"), 
    StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# |           Articles|              Tokens|          Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...|   [went, park]|
# | today it will rain|[today, it, will,...|  [today, rain]|
# +-------------------+--------------------+---------------+

可以使用StopWordsRemoverstopWords参数提供停用词列表,例如:

The list of stop words can be provided using stopWords parameter of the StopWordsRemover, for example:

StopWordsRemover(
    inputCol="Tokens",
    outputCol="Words",
    stopWords=["the", "and", "we", "to", "it"]
)

这篇关于将PySpark RDD作为新列添加到pyspark.sql.dataframe的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆