Pyspark-当show()数据帧正常工作时,saveAsTable引发索引错误 [英] Pyspark - saveAsTable throws index error while show() dataframe works perfectly

查看:107
本文介绍了Pyspark-当show()数据帧正常工作时,saveAsTable引发索引错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试将数据框另存为表格.

Trying to save dataframe as table.

我能够创建数据框并创建临时表.但是,使用saveAsTable()保存相同的数据帧会引发索引错误.

I am able to create dataframe and create temp table as well. But saving the same dataframe using saveAsTable() throws index error.

我检查了Dataframe的架构,这似乎没问题.

I checked the schema of Dataframe, this seems to be ok.

不知道是什么问题,除了索引错误之外,无法从日志中获取任何信息.

Not sure what is the issue and couldn't get anything from the log other than index error.

>

>>> sqlContext.sql('select * from bx_users limit 2').show()
+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
+-------+--------------------+----+

>>> bx_users_df.show(2)
+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
+-------+--------------------+----+
only showing top 2 rows

>>> bx_users_df.printSchema()
root
 |-- User-ID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)

>>> bx_users_df.write.format('parquet').mode('overwrite').saveAsTable('bx_user')
18/05/19 00:12:36 ERROR util.Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<stdin>", line 1, in <lambda>
IndexError: list index out of range

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
18/05/19 00:12:37 ERROR datasources.DefaultWriterContainer: Task attempt attempt_201805190012_0222_m_000000_0 aborted.
18/05/19 00:12:37 ERROR executor.Executor: Exception in task 0.0 in stage 222.0 (TID 245)
org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:269)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

推荐答案

Spark最大的陷阱之一是操作是惰性的,即使调用一个动作,Spark也会尝试做尽可能少的工作.

One of the biggest Spark gotchas is that operation are lazy, and even if call an action, Spark will try to do as little work as possible.

show将尝试仅评估前20行-如果管道中没有宽泛的转换,它将不会处理所有数据.这就是为什么show可以工作而saveAsTable失败的原因.

show for example will try to evaluate only 20 first rows - if there are no wide transformations in the pipeline it won't process all data. This is why show can work, while saveAsTable fails.

您的代码在lambda表达式中失败:

You code fails in lambda expression:

File "<stdin>", line 1, in <lambda>

结果:

IndexError: list index out of range

由于缺乏对格式错误的数据的处理,这几乎总是用户的错误.我怀疑您的代码包含类似于

This is almost always user mistake in lack of handling of malformed data. I suspect your code contains something similar to

(sc.textFile(...)
    .map(lambda line: line.split(...)
    .map(lambda xs: (xs[0], xs[1], xs[3]))) 

并且当行不包含预期数量的参数时,您的代码将失败.

and your code fails when line doesn't contain expected number of arguments.

通常首选处理可能异常的标准函数,或使用其他方法以避免失败.

In general prefer standard functions which handle possible exceptions, or use other methods to avoid failures.

如果只是解析定界数据文件(CSV,TSV),请使用 Spark CSV阅读器.

And if it is just parsing delimited data file (CSV, TSV) use Spark CSV reader.

这篇关于Pyspark-当show()数据帧正常工作时,saveAsTable引发索引错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆