pandas 数据框到Spark数据框“无法合并类型错误" [英] Pandas dataframe to Spark dataframe "Can not merge type error"

查看:180
本文介绍了 pandas 数据框到Spark数据框“无法合并类型错误"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有csv数据,并使用read_csv创建了Pandas数据框,并强制所有列为字符串. 然后,当我尝试从Pandas数据帧创建Spark数据帧时,出现以下错误消息.

I have csv data and created Pandas dataframe using read_csv and forcing all columns as string. Then when I try to create Spark dataframe from the Pandas dataframe, I get the error message below.

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
z=pd.read_csv("mydata.csv", dtype=str)
z.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 74044003 entries, 0 to 74044002
Data columns (total 12 columns):
primaryid       object
event_dt        object
age             object
age_cod         object
age_grp         object
sex             object
occr_country    object
drug_seq        object
drugname        object
route           object
outc_cod        object
pt              object

q= sqlContext.createDataFrame(z)

File "<stdin>", line 1, in <module>
File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 425, in createDataFrame
rdd, schema = self._createFromLocal(data, schema)
 File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 341, in _createFromLocal
struct = self._inferSchemaFromList(data)
 File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 241, in _inferSchemaFromList
schema = reduce(_merge_type, map(_infer_schema, data))
 File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 862, in _merge_type
for f in a.fields]
 File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 856, in _merge_type
raise TypeError("Can not merge type %s and %s" % (type(a), type(b)))
TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>

这里是一个例子.我正在下载公共数据并创建pandas数据框,但是spark不能从pandas数据框创建spark数据框.

Here is an example. I am downloading public data and creating pandas dataframe but spark does not create spark dataframe from the pandas dataframe.

import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

url ="http://www.nber.org/fda/faers/2016/demo2016q1.csv.zip"

import requests, zipfile, StringIO
r = requests.get(url, stream=True)
z = zipfile.ZipFile(StringIO.StringIO(r.content))
z.extractall()


z=pd.read_csv("demo2016q1.csv") # creates pandas dataframe

Data_Frame = sqlContext.createDataFrame(z)

推荐答案

长话短说,不依赖于架构推断.一般来说,它很昂贵而且很棘手.尤其是您数据中的某些列(例如event_dt_num)具有缺失值,这会使熊猫将它们表示为混合类型(字符串表示不丢失,NaN表示缺失值).

Long story short don't depend on schema inference. It is expensive and tricky in general. In particular some columns (for example event_dt_num) in your data have missing values which pushes Pandas to represent them as mixed types (string for not missing, NaN for missing values).

如果您有疑问,最好将所有数据读取为字符串,然后进行转换.如果您有权使用代码簿,则应始终提供架构,以避免出现问题并降低总体成本.

If you're in doubt it is better to read all data as strings and cast afterwards. If you have access to code book you should always provide schema to avoid problems and reduce overall cost.

最后从驱动程序传递的数据是反模式.您应该能够使用csv格式(Spark 2.0.0+)或spark-csv库(Spark 1.6及更低版本)直接读取此数据:

Finally passing data from the driver is anti-pattern. You should be able to read this data directly using csv format (Spark 2.0.0+) or spark-csv library (Spark 1.6 and below):

df = (spark.read.format("csv").options(header="true")
    .load("/path/tp/demo2016q1.csv"))

## root
##  |-- primaryid: string (nullable = true)
##  |-- caseid: string (nullable = true)
##  |-- caseversion: string (nullable = true)
##  |-- i_f_code: string (nullable = true)
##  |-- i_f_code_num: string (nullable = true)
##   ...
##  |-- to_mfr: string (nullable = true)
##  |-- occp_cod: string (nullable = true)
##  |-- reporter_country: string (nullable = true)
##  |-- occr_country: string (nullable = true)
##  |-- occp_cod_num: string (nullable = true)

在这种特殊情况下,添加inferSchema="true"选项应该也可以工作,但最好避免使用它.您还可以提供以下架构:

In this particular case adding inferSchema="true" option should work as well but it is still better to avoid it. You can also provide schema as follows:

from pyspark.sql.types import StructType

schema = StructType.fromJson({'fields': [{'metadata': {},
   'name': 'primaryid',
   'nullable': True,
   'type': 'integer'},
  {'metadata': {}, 'name': 'caseid', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'caseversion', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'i_f_code', 'nullable': True, 'type': 'string'},
  {'metadata': {},
   'name': 'i_f_code_num',
   'nullable': True,
   'type': 'integer'},
  {'metadata': {}, 'name': 'event_dt', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'event_dt_num', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'mfr_dt', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'mfr_dt_num', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'init_fda_dt', 'nullable': True, 'type': 'integer'},
  {'metadata': {},
   'name': 'init_fda_dt_num',
   'nullable': True,
   'type': 'string'},
  {'metadata': {}, 'name': 'fda_dt', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'fda_dt_num', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'rept_cod', 'nullable': True, 'type': 'string'},
  {'metadata': {},
   'name': 'rept_cod_num',
   'nullable': True,
   'type': 'integer'},
  {'metadata': {}, 'name': 'auth_num', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'mfr_num', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'mfr_sndr', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'lit_ref', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'age', 'nullable': True, 'type': 'double'},
  {'metadata': {}, 'name': 'age_cod', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'age_grp', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'age_grp_num', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'sex', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'e_sub', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'wt', 'nullable': True, 'type': 'double'},
  {'metadata': {}, 'name': 'wt_cod', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'rept_dt', 'nullable': True, 'type': 'integer'},
  {'metadata': {}, 'name': 'rept_dt_num', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'to_mfr', 'nullable': True, 'type': 'string'},
  {'metadata': {}, 'name': 'occp_cod', 'nullable': True, 'type': 'string'},
  {'metadata': {},
   'name': 'reporter_country',
   'nullable': True,
   'type': 'string'},
  {'metadata': {}, 'name': 'occr_country', 'nullable': True, 'type': 'string'},
  {'metadata': {},
   'name': 'occp_cod_num',
   'nullable': True,
   'type': 'integer'}],
 'type': 'struct'})

直接给读者:

(spark.read.schema(schema).format("csv").options(header="true")
    .load("/path/to/demo2016q1.csv"))

这篇关于 pandas 数据框到Spark数据框“无法合并类型错误"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆