PySpark 如何将 CSV 读入 Dataframe,并对其进行操作 [英] PySpark How to read CSV into Dataframe, and manipulate it

查看:37
本文介绍了PySpark 如何将 CSV 读入 Dataframe,并对其进行操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 pyspark 很陌生,我正在尝试使用它来处理一个保存为 csv 文件的大型数据集.我想将 CSV 文件读入 spark 数据框,删除一些列,然后添加新列.我该怎么做?

I'm quite new to pyspark and am trying to use it to process a large dataset which is saved as a csv file. I'd like to read CSV file into spark dataframe, drop some columns, and add new columns. How should I do that?

我无法将此数据放入数据框中.这是我目前所拥有的精简版:

I am having trouble getting this data into a dataframe. This is a stripped down version of what I have so far:

def make_dataframe(data_portion, schema, sql):
    fields = data_portion.split(",")
    return sql.createDateFrame([(fields[0], fields[1])], schema=schema)

if __name__ == "__main__":
    sc = SparkContext(appName="Test")
    sql = SQLContext(sc)

    ...

    big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
                .reduce(lambda a, b: a.union(b))

    big_frame.write 
        .format("com.databricks.spark.redshift") 
        .option("url", "jdbc:redshift://<...>") 
        .option("dbtable", "my_table_copy") 
        .option("tempdir", "s3n://path/for/temp/data") 
        .mode("append") 
        .save()

    sc.stop()

这会在reduce步骤产生错误TypeError: 'JavaPackage' object is not callable.

This produces an error TypeError: 'JavaPackage' object is not callable at the reduce step.

可以这样做吗?减少到数据帧的想法是能够将结果数据写入数据库(Redshift,使用 spark-redshift 包).

Is it possible to do this? The idea with reducing to a dataframe is to be able to write the resulting data to a database (Redshift, using the spark-redshift package).

我也尝试过将 unionAll()map()partial() 一起使用,但无法正常工作.

I have also tried using unionAll(), and map() with partial() but can't get it to work.

我在 Amazon 的 EMR 上运行它,使用 spark-redshift_2.10:2.0.0 和 Amazon 的 JDBC 驱动程序 RedshiftJDBC41-1.1.17.1017.jar.p>

I am running this on Amazon's EMR, with spark-redshift_2.10:2.0.0, and Amazon's JDBC driver RedshiftJDBC41-1.1.17.1017.jar.

推荐答案

更新 - 在评论中也回答你的问题:

Update - answering also your question in comments:

从 CSV 读取数据到数据框:您似乎只尝试将 CSV 文件读入 spark 数据帧.

Read data from CSV to dataframe: It seems that you only try to read CSV file into a spark dataframe.

如果是这样 - 我的回答是:https://stackoverflow.com/a/37640154/5088142 涵盖此内容.

If so - my answer here: https://stackoverflow.com/a/37640154/5088142 cover this.

以下代码应将 CSV 读入 spark-data-frame

The following code should read CSV into a spark-data-frame

import pyspark
sc = pyspark.SparkContext()
sql = SQLContext(sc)

df = (sql.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .load("/path/to_csv.csv"))

// these lines are equivalent in Spark 2.0 - using [SparkSession][1]
from pyspark.sql import SparkSession

spark = SparkSession 
    .builder 
    .appName("Python Spark SQL basic example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()

spark.read.format("csv").option("header", "true").load("/path/to_csv.csv") 
spark.read.option("header", "true").csv("/path/to_csv.csv")

删除列

您可以使用drop(col)"删除列https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

you can drop column using "drop(col)" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

删除(col)

Returns a new DataFrame that drops the specified column.
Parameters: col – a string name of the column to drop, or a Column to drop.

>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]

添加列您可以使用withColumn"https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

add column You can use "withColumn" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

withColumn(colName, col)

withColumn(colName, col)

Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
Parameters: 

    colName – string, name of the new column.
    col – a Column expression for the new column.

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]

注意:spark 还有很多其他功能可以使用(例如,您可以使用select"而不是drop")

Note: spark has a lot of other functions which can be used (e.g. you can use "select" instead of "drop")

这篇关于PySpark 如何将 CSV 读入 Dataframe,并对其进行操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆