Pyspark-当show()数据帧正常工作时,saveAsTable引发索引错误 [英] Pyspark - saveAsTable throws index error while show() dataframe works perfectly
问题描述
尝试将数据框另存为表格.
Trying to save dataframe as table.
我能够创建数据框并创建临时表.但是,使用saveAsTable()保存相同的数据帧会引发索引错误.
I am able to create dataframe and create temp table as well. But saving the same dataframe using saveAsTable() throws index error.
我检查了Dataframe的架构,这似乎没问题.
I checked the schema of Dataframe, this seems to be ok.
不知道是什么问题,除了索引错误之外,无法从日志中获取任何信息.
Not sure what is the issue and couldn't get anything from the log other than index error.
>
>>> sqlContext.sql('select * from bx_users limit 2').show()
+-------+--------------------+----+
|User-ID| Location| Age|
+-------+--------------------+----+
| 1| nyc, new york, usa|NULL|
| 2|stockton, califor...| 18|
+-------+--------------------+----+
>>> bx_users_df.show(2)
+-------+--------------------+----+
|User-ID| Location| Age|
+-------+--------------------+----+
| 1| nyc, new york, usa|NULL|
| 2|stockton, califor...| 18|
+-------+--------------------+----+
only showing top 2 rows
>>> bx_users_df.printSchema()
root
|-- User-ID: string (nullable = true)
|-- Location: string (nullable = true)
|-- Age: string (nullable = true)
>>> bx_users_df.write.format('parquet').mode('overwrite').saveAsTable('bx_user')
18/05/19 00:12:36 ERROR util.Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<stdin>", line 1, in <lambda>
IndexError: list index out of range
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:261)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
18/05/19 00:12:37 ERROR datasources.DefaultWriterContainer: Task attempt attempt_201805190012_0222_m_000000_0 aborted.
18/05/19 00:12:37 ERROR executor.Executor: Exception in task 0.0 in stage 222.0 (TID 245)
org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:269)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
推荐答案
Spark最大的陷阱之一是操作是惰性的,即使调用一个动作,Spark也会尝试做尽可能少的工作.
One of the biggest Spark gotchas is that operation are lazy, and even if call an action, Spark will try to do as little work as possible.
show
将尝试仅评估前20行-如果管道中没有宽泛的转换,它将不会处理所有数据.这就是为什么show
可以工作而saveAsTable
失败的原因.
show
for example will try to evaluate only 20 first rows - if there are no wide transformations in the pipeline it won't process all data. This is why show
can work, while saveAsTable
fails.
您的代码在lambda表达式中失败:
You code fails in lambda expression:
File "<stdin>", line 1, in <lambda>
结果:
IndexError: list index out of range
由于缺乏对格式错误的数据的处理,这几乎总是用户的错误.我怀疑您的代码包含类似于
This is almost always user mistake in lack of handling of malformed data. I suspect your code contains something similar to
(sc.textFile(...)
.map(lambda line: line.split(...)
.map(lambda xs: (xs[0], xs[1], xs[3])))
并且当行不包含预期数量的参数时,您的代码将失败.
and your code fails when line doesn't contain expected number of arguments.
通常首选处理可能异常的标准函数,或使用其他方法以避免失败.
In general prefer standard functions which handle possible exceptions, or use other methods to avoid failures.
如果只是解析定界数据文件(CSV,TSV),请使用 Spark CSV阅读器.
And if it is just parsing delimited data file (CSV, TSV) use Spark CSV reader.
这篇关于Pyspark-当show()数据帧正常工作时,saveAsTable引发索引错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!