Spark /标量大小的SQL查询超过Integer.MAX_VALUE [英] SQL query in Spark/scala Size exceeds Integer.MAX_VALUE

查看:518
本文介绍了Spark /标量大小的SQL查询超过Integer.MAX_VALUE的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark在S3事件上创建一个简单的SQL查询。我正在加载约30GB的JSON文件,如下所示:

I am trying to create a simple sql query on S3 events using Spark. I am loading ~30GB of JSON files as following:

val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");

然后我正尝试写出查询结果:

Then I am trying to write to file the result of my query:

val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");

但是Spark抛出以下异常:

But Spark is throwing the following exception:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

请注意,同一查询适用于少量数据。

Note that the same query works for smaller amounts of data. What's the problem here?

推荐答案

Spark shuffle块不能大于2GB(Integer.MAX_VALUE字节),因此您需要更多/较小的分区。

No Spark shuffle block can be larger than 2GB (Integer.MAX_VALUE bytes) so you need more / smaller partitions.

您应调整spark.default.parallelism和spark.sql.shuffle.partitions(默认值为200),以使分区数可以容纳您的数据而不会达到2GB的上限(您可以尝试将256MB /分区作为目标,因此,对于200GB的分区,您可以获得800个分区)。成千上万个分区非常常见,因此请不要害怕将其分区为建议的1000。

You should adjust spark.default.parallelism and spark.sql.shuffle.partitions (default 200) such that the number of partitions can accommodate your data without reaching the 2GB limit (you could try aiming for 256MB / partition so for 200GB you get 800 partitions). Thousands of partitions is very common so don't be afraid to repartition to 1000 as suggested.

仅供参考,您可以使用rdd之类的命令检查RDD的分区数.getNumPartitions(即d2.rdd.getNumPartitions)

FYI, you may check the number of partitions for an RDD with something like rdd.getNumPartitions (i.e. d2.rdd.getNumPartitions)

有一个故事可以跟踪解决各种2GB限制的工作(现在已经开放了一段时间): https://issues.apache.org/jira/browse/SPARK-6235

There's a story to track the effort of addressing the various 2GB limits (been open for a while now): https://issues.apache.org/jira/browse/SPARK-6235

请参见 http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25 有关此的更多信息错误。

See http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25 for more info on this error.

这篇关于Spark /标量大小的SQL查询超过Integer.MAX_VALUE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆