PySpark如何将CSV读取到Dataframe中并进行操作 [英] PySpark How to read CSV into Dataframe, and manipulate it
问题描述
我对pyspark还是很陌生,并且正在尝试使用它来处理保存为csv文件的大型数据集. 我想将CSV文件读取到spark数据框中,删除一些列,然后添加新列. 我该怎么办?
I'm quite new to pyspark and am trying to use it to process a large dataset which is saved as a csv file. I'd like to read CSV file into spark dataframe, drop some columns, and add new columns. How should I do that?
我无法将这些数据放入数据帧.这是我到目前为止的精简版本:
I am having trouble getting this data into a dataframe. This is a stripped down version of what I have so far:
def make_dataframe(data_portion, schema, sql):
fields = data_portion.split(",")
return sql.createDateFrame([(fields[0], fields[1])], schema=schema)
if __name__ == "__main__":
sc = SparkContext(appName="Test")
sql = SQLContext(sc)
...
big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
.reduce(lambda a, b: a.union(b))
big_frame.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<...>") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.mode("append") \
.save()
sc.stop()
这将在减少步骤中产生错误TypeError: 'JavaPackage' object is not callable
.
This produces an error TypeError: 'JavaPackage' object is not callable
at the reduce step.
是否可以这样做?简化为数据框的想法是能够将结果数据写入数据库(Redshift,使用spark-redshift包).
Is it possible to do this? The idea with reducing to a dataframe is to be able to write the resulting data to a database (Redshift, using the spark-redshift package).
我也尝试过将unionAll()
和map()
与partial()
结合使用,但无法使其正常工作.
I have also tried using unionAll()
, and map()
with partial()
but can't get it to work.
我正在使用spark-redshift_2.10:2.0.0
和Amazon的JDBC驱动程序RedshiftJDBC41-1.1.17.1017.jar
在Amazon的EMR上运行它.
I am running this on Amazon's EMR, with spark-redshift_2.10:2.0.0
, and Amazon's JDBC driver RedshiftJDBC41-1.1.17.1017.jar
.
推荐答案
更新-还在评论中回答您的问题:
Update - answering also your question in comments:
将数据从CSV读取到数据框: 看来您只尝试将CSV文件读取到spark数据框中.
Read data from CSV to dataframe: It seems that you only try to read CSV file into a spark dataframe.
如果是这样-我在这里的答案: https://stackoverflow.com/a/37640154/5088142 涵盖了这一点.
If so - my answer here: https://stackoverflow.com/a/37640154/5088142 cover this.
以下代码应将CSV读入spark-data-frame
The following code should read CSV into a spark-data-frame
import pyspark
sc = pyspark.SparkContext()
sql = SQLContext(sc)
df = (sql.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load("/path/to_csv.csv"))
// these lines are equivalent in Spark 2.0 - using [SparkSession][1]
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
spark.read.format("csv").option("header", "true").load("/path/to_csv.csv")
spark.read.option("header", "true").csv("/path/to_csv.csv")
放置列
您可以使用"drop(col)"删除列 https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
you can drop column using "drop(col)" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
下降(col)
Returns a new DataFrame that drops the specified column.
Parameters: col – a string name of the column to drop, or a Column to drop.
>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]
>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]
添加列 您可以使用"withColumn" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
add column You can use "withColumn" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html
withColumn(colName,col)
withColumn(colName, col)
Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
Parameters:
colName – string, name of the new column.
col – a Column expression for the new column.
>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
注意:spark还有许多其他可以使用的功能(例如,您可以使用选择"而不是放下")
Note: spark has a lot of other functions which can be used (e.g. you can use "select" instead of "drop")
这篇关于PySpark如何将CSV读取到Dataframe中并进行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!