PySpark 如何将 CSV 读入 Dataframe,并对其进行操作 [英] PySpark How to read CSV into Dataframe, and manipulate it
问题描述
我对 pyspark 很陌生,我正在尝试使用它来处理一个保存为 csv 文件的大型数据集.我想将 CSV 文件读入 spark 数据框,删除一些列,然后添加新列.我该怎么做?
I'm quite new to pyspark and am trying to use it to process a large dataset which is saved as a csv file. I'd like to read CSV file into spark dataframe, drop some columns, and add new columns. How should I do that?
我无法将此数据放入数据框中.这是我目前所拥有的精简版:
I am having trouble getting this data into a dataframe. This is a stripped down version of what I have so far:
def make_dataframe(data_portion, schema, sql):
fields = data_portion.split(",")
return sql.createDateFrame([(fields[0], fields[1])], schema=schema)
if __name__ == "__main__":
sc = SparkContext(appName="Test")
sql = SQLContext(sc)
...
big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
.reduce(lambda a, b: a.union(b))
big_frame.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://<...>")
.option("dbtable", "my_table_copy")
.option("tempdir", "s3n://path/for/temp/data")
.mode("append")
.save()
sc.stop()
这会在reduce步骤产生错误TypeError: 'JavaPackage' object is not callable
.
This produces an error TypeError: 'JavaPackage' object is not callable
at the reduce step.
可以这样做吗?减少到数据帧的想法是能够将结果数据写入数据库(Redshift,使用 spark-redshift 包).
Is it possible to do this? The idea with reducing to a dataframe is to be able to write the resulting data to a database (Redshift, using the spark-redshift package).
我也尝试过将 unionAll()
和 map()
与 partial()
一起使用,但无法正常工作.
I have also tried using unionAll()
, and map()
with partial()
but can't get it to work.
我在 Amazon 的 EMR 上运行它,使用 spark-redshift_2.10:2.0.0
和 Amazon 的 JDBC 驱动程序 RedshiftJDBC41-1.1.17.1017.jar
.p>
I am running this on Amazon's EMR, with spark-redshift_2.10:2.0.0
, and Amazon's JDBC driver RedshiftJDBC41-1.1.17.1017.jar
.
推荐答案
更新 - 在评论中也回答你的问题:
Update - answering also your question in comments:
从 CSV 读取数据到数据框:您似乎只尝试将 CSV 文件读入 spark 数据帧.
Read data from CSV to dataframe: It seems that you only try to read CSV file into a spark dataframe.
如果是这样 - 我的回答是:https://stackoverflow.com/a/37640154/5088142 涵盖此内容.
If so - my answer here: https://stackoverflow.com/a/37640154/5088142 cover this.
以下代码应将 CSV 读入 spark-data-frame
The following code should read CSV into a spark-data-frame
import pyspark
sc = pyspark.SparkContext()
sql = SQLContext(sc)
df = (sql.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("/path/to_csv.csv"))
// these lines are equivalent in Spark 2.0 - using [SparkSession][1]
from pyspark.sql import SparkSession
spark = SparkSession
.builder
.appName("Python Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
spark.read.format("csv").option("header", "true").load("/path/to_csv.csv")
spark.read.option("header", "true").csv("/path/to_csv.csv")
删除列
您可以使用drop(col)"删除列https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
you can drop column using "drop(col)" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
删除(col)
Returns a new DataFrame that drops the specified column.
Parameters: col – a string name of the column to drop, or a Column to drop.
>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]
>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]
添加列您可以使用withColumn"https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
add column You can use "withColumn" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
withColumn(colName, col)
withColumn(colName, col)
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
Parameters:
colName – string, name of the new column.
col – a Column expression for the new column.
>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
注意:spark 还有很多其他功能可以使用(例如,您可以使用select"而不是drop")
Note: spark has a lot of other functions which can be used (e.g. you can use "select" instead of "drop")
这篇关于PySpark 如何将 CSV 读入 Dataframe,并对其进行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!