将PySpark RDD作为新列添加到pyspark.sql.dataframe [英] Add PySpark RDD as new column to pyspark.sql.dataframe
问题描述
我有一个pyspark.sql.dataframe,其中每一行都是新闻文章.然后,我有一个RDD,代表每篇文章中包含的单词.我想将单词的RDD作为名为单词"的列添加到新文章的数据框中.我尝试过
I have a pyspark.sql.dataframe where each row is a news article. I then have a RDD that represents the words contained in each article. I want to add the RDD of words as a column named 'words' to my dataframe of new articles. I tried
df.withColumn('words', words_rdd )
但是我得到了错误
AssertionError: col should be Column
DataFrame看起来像这样
The DataFrame looks something like this
Articles
the cat and dog ran
we went to the park
today it will rain
但是我有3k条新闻.
我应用了一个函数来清除文本,例如删除停用词,并且我有一个如下所示的RDD:
I applied a function to clean the text such as remove stop words and I have a RDD that looks like this:
[[cat, dog, ran],[we, went, park],[today, will, rain]]
我正试图让我的数据框看起来像这样:
I'm trying to get my Dataframe to look like this:
Articles Words
the cat and dog ran [cat, dog, ran]
we went to the park [we, went, park]
today it will rain [today, will, rain]
推荐答案
免责声明:
火花DataFrame
通常没有严格定义的顺序.使用风险自负.
Spark DataFrame
in general has no strictly defined order. Use at your own risk.
将索引添加到现有的DataFrame
:
from pyspark.sql.types import *
df_index = spark.createDataFrame(
df.rdd.zipWithIndex(),
StructType([StructField("data", df.schema), StructField("id", LongType())])
)
将索引添加到RDD
并转换为DataFrame
:
Add index to RDD
and convert to DataFrame
:
words_df = spark.createDataFrame(
words_rdd.zipWithIndex(),
StructType([
StructField("words", ArrayType(StringType())),
StructField("id", LongType())
])
)
同时加入并选择必填字段:
Join both and select required fields:
df_index.join(words_df, "id").select("data.*", "words")
警告
有不同的解决方案,它们可能在特定情况下有效,但不能保证性能和/或正确性.这些包括:
There are different solutions, which might work in specific cases, but don't guarantee performance and or correctness. These include:
- 使用
monotonically_increasing_id
作为join
键-通常情况下不正确. - 使用
row_number()
窗口函数作为连接键-令人无法接受的性能暗示,并且如果未定义特定顺序,则通常不正确. - 在
RDDs
上使用zip
-可以并且仅当两个结构具有相同的数据分布时才可以工作(在这种情况下应该可以工作).
- Using
monotonically_increasing_id
as ajoin
key - in general case not correct. - Using
row_number()
window function as a join key - unacceptable performance implication and in general not correct if there is no specific order defined. - Using
zip
onRDDs
- can work if and only if both structures have the same data distribution (should work in this case).
注意:
在这种情况下,您不需要RDD
. pyspark.ml.feature
提供了多种Transformers
,它们应该很适合您.
In this specific case you shouldn't need RDD
. pyspark.ml.feature
provides a variety of Transformers
, which should work well for you.
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
df = spark.createDataFrame(
["the cat and dog ran", "we went to the park", "today it will rain"],
"string"
).toDF("Articles")
Pipeline(stages=[
RegexTokenizer(inputCol="Articles", outputCol="Tokens"),
StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# | Articles| Tokens| Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...| [went, park]|
# | today it will rain|[today, it, will,...| [today, rain]|
# +-------------------+--------------------+---------------+
可以使用StopWordsRemover
的stopWords
参数提供停用词列表,例如:
The list of stop words can be provided using stopWords
parameter of the StopWordsRemover
, for example:
StopWordsRemover(
inputCol="Tokens",
outputCol="Words",
stopWords=["the", "and", "we", "to", "it"]
)
这篇关于将PySpark RDD作为新列添加到pyspark.sql.dataframe的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!