如何将数据从大 pandas 数据帧加载到Spark数据帧 [英] How to load data in chunks from a pandas dataframe to a spark dataframe

查看:95
本文介绍了如何将数据从大 pandas 数据帧加载到Spark数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经使用类似这样的方式通过pyodbc连接读取了大块数据:

I have read data in chunks over a pyodbc connection using something like this :

import pandas as pd
import pyodbc
conn = pyodbc.connect("Some connection Details")
sql = "SELECT * from TABLES;"
df1 = pd.read_sql(sql,conn,chunksize=10)

现在我想使用类似的东西将所有这些块读入一个单一的spark数据帧中:

Now I want to read all these chunks into one single spark dataframe using something like:

i = 0
for chunk in df1:
    if i==0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2.unionAll(sqlContext.createDataFrame(chunk))
    i = i+1

问题是当我执行df2.count()时我得到的结果为10,这意味着只有i = 0的情况在起作用,这是unionAll的错误.我在这里做错什么了吗?

The problem is when i do a df2.count() i get the result as 10 which means only the i=0 case is working.Is this a bug with unionAll. Am i doing something wrong here??

推荐答案

此外,您可以改为使用 enumerate() 以避免自己管理i变量:

Furthermore you can instead use enumerate() to avoid having to manage the i variable yourself:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.unionAll(sqlContext.createDataFrame(chunk))

此外,.unionAll()的文档指出已弃用.unionAll(),现在您应该使用

Furthermore the documentation for .unionAll() states that .unionAll() is deprecated and now you should use .union() which acts like UNION ALL in SQL:

for i,chunk in enumerate(df1):
    if i == 0:
        df2 = sqlContext.createDataFrame(chunk)
    else:
        df2 = df2.union(sqlContext.createDataFrame(chunk))


此外,我将不再继续说,但在我进一步说之前,不要再说了:正如@ zero323所说的,我们不要在循环中使用.union().让我们做类似的事情:


Furthermore I'll stop saying furthermore but not before I say furthermore: As @zero323 says let's not use .union() in a loop. Let's instead do something like:

def unionAll(*dfs):
    ' by @zero323 from here: http://stackoverflow.com/a/33744540/42346 '
    first, *rest = dfs  # Python 3.x, for 2.x you'll have to unpack manually
    return first.sql_ctx.createDataFrame(
        first.sql_ctx._sc.union([df.rdd for df in dfs]),
        first.schema
    )

df_list = []
for chunk in df1:
    df_list.append(sqlContext.createDataFrame(chunk))

df_all = unionAll(df_list)

这篇关于如何将数据从大 pandas 数据帧加载到Spark数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆