Spark Local Mode - 所有作业仅使用一个CPU核心 [英] Spark Local Mode - all jobs only use one CPU core

查看:903
本文介绍了Spark Local Mode - 所有作业仅使用一个CPU核心的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们使用

local [*]

然而,使用New Relic工具进行性能分析和简单的顶级显示我们的16核心机器中只有一个CPU核心用于三种不同的Java spark我们编写的工作(我们也尝试过不同的AWS实例,但只使用了一个核心)。

However, profiling using New Relic tools and a simple 'top' show that only one CPU core of our 16 core machine is ever in use for three different Java spark jobs we've written (we've also tried different AWS instances but only one core is ever used).

Runtime.getRuntime()。 availableProcessors()报告16个处理器和
sparkContext.defaultParallelism()报告16个。

Runtime.getRuntime().availableProcessors() reports 16 processors and sparkContext.defaultParallelism() reports 16 as well.

我查看了各种Stackoverflow本地模式问题,但似乎没有解决问题。

I've looked at various Stackoverflow local mode issues but none seem to have resolved the issue.

任何建议都非常感谢。

谢谢

编辑:处理

1)使用sqlContext读取gzip压缩CSV文件1使用com.databricks.spark.csv从光盘(S3)到DataFrame DF1。

1) Use sqlContext to read gzipped CSV file 1 using com.databricks.spark.csv from disc (S3) into DataFrame DF1.

2)使用sqlContext使用com.databri读取gzip压缩文件2 cks.spark.csv从光盘(S3)到DataFrame DF2。

2) Use sqlContext to read gzipped CSV file 2 using com.databricks.spark.csv from disc (S3) into DataFrame DF2.

3)使用DF1.toJavaRDD()。mapToPair(返回元组的新映射函数)RDD1

3) Use DF1.toJavaRDD().mapToPair(new mapping function that returns a Tuple>) RDD1

4)使用DF2.toJavaRDD()。mapToPair(返回元组的新映射函数>)RDD2

4) Use DF2.toJavaRDD().mapToPair(new mapping function that returns a Tuple>) RDD2

5)在RDD上调用union

5) Call union on the RDDs

6)在unioned RDD上调用reduceByKey()为按键合并,所以有一个Tuple>)只有一个实例一个特定的键(因为同一个键出现在RDD1和RDD2中)。

6) Call reduceByKey() on the unioned RDDs to "merge by key" so have a Tuple>) with only one instance of a particular key (as the same key appears in both RDD1 and RDD2).

7)调用.values()。map(新的映射函数,迭代所有项目)提供的List并根据需要合并它们以返回相同或更小长度的List

7) Call .values().map(new mapping Function which iterates over all items in the provided List and merges them as required to return a List of the same or smaller length

8)调用.flatMap()获取RDD

8) Call .flatMap() to get an RDD

9)使用sqlContext从类型为DomainClass的平面地图创建一个DataFrame

9) Use sqlContext to create a DataFrame from the flat map of type DomainClass

10)使用DF.coalease(1).write ()将DF作为gzip压缩写入S3。

10) Use DF.coalease(1).write() to write the DF as gzipped CSV to S3.

推荐答案

我认为你的问题是你的CSV文件是拉开了。当Spark读取文件时,它会并行加载它们,但只有在文件编解码器可拆分*时才能执行此操作。普通(非gzip)文本和镶木地板是可拆分的,以及基因组学中使用的 bgzip 编解码器(我的字段)。您的整个文件最终都在一个分区中。

I think your problem is that your CSV files are gzipped. When Spark reads files, it loads them in parallel, but it can only do this if the file codec is splittable*. Plain (non-gzipped) text and parquet are splittable, as well as the bgzip codec used in genomics (my field). Your entire files are ending up in one partition each.

尝试解压缩csv.gz文件并再次运行。我想你会看到更好的结果!

Try decompressing the csv.gz files and running this again. I think you'll see much better results!


  • 可分割格式意味着如果给你一个任意的文件偏移量来开始阅读,您可以在块中找到下一条记录的开头并进行解释。 Gzipped文件不可拆分。

编辑:我在我的机器上复制了这种行为。在3G gzip压缩文本文件上使用 sc.textFile 生成1个分区。

I replicated this behavior on my machine. Using sc.textFile on a 3G gzipped text file produced 1 partition.

这篇关于Spark Local Mode - 所有作业仅使用一个CPU核心的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆