pyspark:TypeError:IntegerType无法接受< type'unicode'>类型的对象. [英] pyspark: TypeError: IntegerType can not accept object in type <type 'unicode'>
问题描述
在Spark集群上使用pyspark编程, 数据又大又零碎,因此无法轻松地加载到内存中或检查数据的完整性
programming with pyspark on a Spark cluster, the data is large and in pieces so can not be loaded into the memory or check the sanity of the data easily
基本上看起来像
af.b Current%20events 1 996
af.b Kategorie:Musiek 1 4468
af.b Spesiaal:RecentChangesLinked/Gebruikerbespreking:Freakazoid 1 5209
af.b Spesiaal:RecentChangesLinked/Sir_Arthur_Conan_Doyle 1 5214
维基百科数据:
我从AWS S3读取了它,然后尝试在pyspark intepreter中使用以下python代码构建spark Dataframe:
I read it from aws S3 and then try to construct spark Dataframe with the following python code in pyspark intepreter:
parts = data.map(lambda l: l.split())
wikis = parts.map(lambda p: (p[0], p[1],p[2],p[3]))
fields = [StructField("project", StringType(), True),
StructField("title", StringType(), True),
StructField("count", IntegerType(), True),
StructField("byte_size", StringType(), True)]
schema = StructType(fields)
df = sqlContext.createDataFrame(wikis, schema)
一切都很好,只有createDataFrame给我错误
all look fine, only createDataFrame gives me error
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 404, in createDataFrame
rdd, schema = self._createFromRDD(data, schema, samplingRatio)
File "/usr/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
_verify_type(row, schema)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1152, in _verify_type
_verify_type(v, f.dataType)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1136, in _verify_type
raise TypeError("%s can not accept object in type %s" % (dataType, type(obj)))
TypeError: IntegerType can not accept object in type <type 'unicode'>
为什么我不能设置应计入IntegerType的第三列? 我该如何解决?
why I can not set the third column which should be count to IntegerType ? How can I solve this ?
推荐答案
如 ccheneson 所述,您传递了错误的类型.
As noted by ccheneson you pass wrong types.
假设您data
看起来像这样:
data = sc.parallelize(["af.b Current%20events 1 996"])
在第一张地图之后,您会得到RDD[List[String]]
:
After the first map you get RDD[List[String]]
:
parts = data.map(lambda l: l.split())
parts.first()
## ['af.b', 'Current%20events', '1', '996']
第二个映射将其转换为元组(String, String, String, String)
:
The second map converts it to tuple (String, String, String, String)
:
wikis = parts.map(lambda p: (p[0], p[1], p[2],p[3]))
wikis.first()
## ('af.b', 'Current%20events', '1', '996')
您的schema
指出第3列是整数:
Your schema
states that 3rd columns is an integer:
[f.dataType for f in schema.fields]
## [StringType, StringType, IntegerType, StringType]
模式最常用于避免进行全表扫描以推断类型,并且不执行任何类型转换.
Schema is used most to avoid a full table scan to infer types and doesn't perform any type casting.
您可以在上一张地图中投射数据:
You can either cast your data during last map:
wikis = parts.map(lambda p: (p[0], p[1], int(p[2]), p[3]))
或将count
定义为StringType
并强制转换列
Or define count
as a StringType
and cast column
fields[2] = StructField("count", StringType(), True)
schema = StructType(fields)
wikis.toDF(schema).withColumn("cnt", col("count").cast("integer")).drop("count")
在侧面说明中,count
是SQL中的保留字,不应用作列名.在Spark中,它将在某些情况下按预期工作,而在其他情况下会失败.
On a side note count
is reserved word in SQL and shouldn't be used as a column name. In Spark it will work as expected in some contexts and fail in another.
这篇关于pyspark:TypeError:IntegerType无法接受< type'unicode'>类型的对象.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!