Pyspark - saveAsTable 抛出索引错误,而 show() 数据框完美运行 [英] Pyspark - saveAsTable throws index error while show() dataframe works perfectly

查看:39
本文介绍了Pyspark - saveAsTable 抛出索引错误,而 show() 数据框完美运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

尝试将数据框保存为表格.

我能够创建数据框并创建临时表.但是使用 saveAsTable() 保存相同的数据帧会引发索引错误.

我检查了 Dataframe 的架构,这似乎没问题.

不确定是什么问题,除了索引错误之外无法从日志中获得任何信息.

>

<预><代码>>>>sqlContext.sql('select * from bx_users limit 2').show()+-------+--------------------+----+|用户 ID|位置|年龄|+-------+--------------------+----+|1|纽约,纽约,美国|NULL||2|加利福尼亚州斯托克顿...|18|+-------+--------------------+----+>>>bx_users_df.show(2)+-------+--------------------+----+|用户 ID|位置|年龄|+-------+--------------------+----+|1|纽约,纽约,美国|NULL||2|加利福尼亚州斯托克顿...|18|+-------+--------------------+----+只显示前两行>>>bx_users_df.printSchema()根|-- 用户 ID:字符串(可为空 = 真)|-- 位置:字符串(可为空 = 真)|-- 年龄:字符串(可为空 = 真)>>>bx_users_df.write.format('parquet').mode('overwrite').saveAsTable('bx_user')18/05/19 00:12:36 错误 util.Utils:中止任务org.apache.spark.api.python.PythonException:回溯(最近一次调用):文件/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 111 行,在 main过程()文件/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py",第 106 行,正在处理中serializer.dump_stream(func(split_index, iterator), outfile)文件/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py",第 263 行,在 dump_stream 中vs = list(itertools.islice(iterator, batch))文件<stdin>",第 1 行,在 <lambda> 中IndexError:列表索引超出范围在 org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)在 org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)在 org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)在 org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)在 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)在 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)在 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)在 org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:261)在 org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)在 org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)在 org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)在 org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)在 org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)在 org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)在 org.apache.spark.scheduler.Task.run(Task.scala:89)在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)在 java.lang.Thread.run(Thread.java:745)18/05/19 00:12:37 错误 datasources.DefaultWriterContainer:任务尝试尝试_201805190012_0222_m_000000_0 中止.18/05/19 00:12:37 错误 executor.Executor: 阶段 222.0 (TID 245) 中任务 0.0 中的异常org.apache.spark.SparkException:写入行时任务失败在 org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:269)在 org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)在 org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)在 org.apache.spark.scheduler.Task.run(Task.scala:89)在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)在 java.lang.Thread.run(Thread.java:745)

解决方案

Spark 最大的问题之一是操作是惰性的,即使调用一个动作,Spark 也会尽量少做一些工作.

例如,

show 将尝试仅评估前 20 行 - 如果管道中没有广泛的转换,它将不会处理所有数据.这就是为什么 show 可以工作,而 saveAsTable 失败的原因.

您的代码在 lambda 表达式中失败:

文件",第1行,

结果:

IndexError: 列表索引超出范围

由于缺乏对格式错误的数据的处理,这几乎总是用户的错误.我怀疑您的代码包含类似于

(sc.textFile(...).map(lambda line: line.split(...).map(lambda xs: (xs[0], xs[1], xs[3])))

当行不包含预期数量的参数时,您的代码将失败.

通常更喜欢处理可能异常的标准函数,或使用其他方法来避免失败.

如果只是解析带分隔符的数据文件(CSV、TSV),请使用 Spark CSV reader.

Trying to save dataframe as table.

I am able to create dataframe and create temp table as well. But saving the same dataframe using saveAsTable() throws index error.

I checked the schema of Dataframe, this seems to be ok.

Not sure what is the issue and couldn't get anything from the log other than index error.

>

>>> sqlContext.sql('select * from bx_users limit 2').show()
+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
+-------+--------------------+----+

>>> bx_users_df.show(2)
+-------+--------------------+----+
|User-ID|            Location| Age|
+-------+--------------------+----+
|      1|  nyc, new york, usa|NULL|
|      2|stockton, califor...|  18|
+-------+--------------------+----+
only showing top 2 rows

>>> bx_users_df.printSchema()
root
 |-- User-ID: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Age: string (nullable = true)

>>> bx_users_df.write.format('parquet').mode('overwrite').saveAsTable('bx_user')
18/05/19 00:12:36 ERROR util.Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "<stdin>", line 1, in <lambda>
IndexError: list index out of range

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)
    at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:261)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
18/05/19 00:12:37 ERROR datasources.DefaultWriterContainer: Task attempt attempt_201805190012_0222_m_000000_0 aborted.
18/05/19 00:12:37 ERROR executor.Executor: Exception in task 0.0 in stage 222.0 (TID 245)
org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:269)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

解决方案

One of the biggest Spark gotchas is that operation are lazy, and even if call an action, Spark will try to do as little work as possible.

show for example will try to evaluate only 20 first rows - if there are no wide transformations in the pipeline it won't process all data. This is why show can work, while saveAsTable fails.

You code fails in lambda expression:

File "<stdin>", line 1, in <lambda>

as a result:

IndexError: list index out of range

This is almost always user mistake in lack of handling of malformed data. I suspect your code contains something similar to

(sc.textFile(...)
    .map(lambda line: line.split(...)
    .map(lambda xs: (xs[0], xs[1], xs[3]))) 

and your code fails when line doesn't contain expected number of arguments.

In general prefer standard functions which handle possible exceptions, or use other methods to avoid failures.

And if it is just parsing delimited data file (CSV, TSV) use Spark CSV reader.

这篇关于Pyspark - saveAsTable 抛出索引错误,而 show() 数据框完美运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆