Apache Spark 无法处理大型 Cassandra 列族 [英] Apache Spark fails to process a large Cassandra column family

查看:37
本文介绍了Apache Spark 无法处理大型 Cassandra 列族的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Apache Spark 来处理我的大型(约 230k 个条目)cassandra 数据集,但我经常遇到不同类型的错误.但是,在数据集约 200 个条目上运行时,我可以成功运行应用程序.我有 3 个节点的 spark 设置,其中包含 1 个主节点和 2 个工作线程,并且这 2 个工作节点还安装了一个 cassandra 集群,其中数据索引的复制因子为 2.我的 2 个 spark 工作线程在 Web 界面上显示 2.4 GB 和 2.8 GB 内存,并且我在运行应用程序时将 spark.executor.memory 设置为 2409,以获得 4.7 GB 的组合内存.这是我的 WebUI 主页

I am trying to use Apache Spark to process my large (~230k entries) cassandra dataset, but I am constantly running into different kinds of errors. However I can successfully run applications when running on a dataset ~200 entries. I have a spark setup of 3 nodes with 1 master and 2 workers, and the 2 workers also have a cassandra cluster installed with data indexed with a replication factor of 2. My 2 spark workers show 2.4 and 2.8 GB memory on the web interface and I set spark.executor.memory to 2409 when running an application, to get a combined memory of 4.7 GB. Here is my WebUI Homepage

其中一项任务的环境页面

The environment page of one of the tasks

在这个阶段,我只是尝试使用 spark 处理存储在 cassandra 中的数据.这是我用来在 Java 中执行此操作的基本代码

At this stage, I am simply trying to process data stored in cassandra using spark. Here is the basic code I am using to do this in Java

SparkConf conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", CASSANDRA_HOST)
        .setJars(jars);

SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SparkContextJavaFunctions context = javaFunctions(sc);

CassandraJavaRDD<CassandraRow> rdd = context.cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);

System.out.println(rdd.count());

为了成功运行,在一个小数据集(200 个条目)上,事件界面看起来像这样

For a successful run, on a small dataset (200 entries), the events interface looks something like this

但是当我在大型数据集上运行同样的事情时(即我只更改了 CASSANDRA_COLUMN_FAMILY),作业永远不会在终端内终止,日志看起来像这样

But when I run the same thing on a large dataset (i.e. I change only the CASSANDRA_COLUMN_FAMILY), the job never terminates inside the terminal, the log looks like this

大约 2 分钟后,执行程序的标准错误看起来像这样

and after ~2 minutes, the stderr for the executors looks like this

大约 7 分钟后,我明白了

and after ~7 minutes, I get

线程main"中的异常java.lang.OutOfMemoryError:超出GC开销限制

在我的终端中,我必须手动终止 SparkSubmit 进程.但是,大数据集是从一个仅占用 22 MB 的二进制文件编制索引的,并且执行 nodetool status,我可以看到只有 ~115 MB 的数据存储在我的两个 cassandra 节点中.我也尝试在我的数据集上使用 Spark SQL,但也得到了类似的结果.对于 Transformation-Action 程序和使用 Spark SQL 的程序,我的设置哪里出了问题,我应该怎么做才能成功处理我的数据集.

in my terminal, and I have to manually kill the SparkSubmit process. However, the large dataset was indexed from a binary file that occupied only 22 MB, and doing nodetool status, I can see that only ~115 MB data is stored in both of my cassandra nodes. I have also tried to use Spark SQL on my dataset, but have got similar results with that too. Where am I going wrong with my setup, and what should I do to successfully process my dataset, for both a Transformation-Action program and a program that uses Spark SQL.

我已经尝试过以下方法

  • 使用 -Xms1G -Xmx1G 来增加内存,但程序失败并出现异常,说我应该设置 spark.executor.memory,我有.

  • Using -Xms1G -Xmx1G to increase memory, but the program fails with an exception saying that I should instead set spark.executor.memory, which I have.

使用 spark.cassandra.input.split.size,但没有说它不是一个有效的选项,类似的选项是 spark.cassandra.input.split.size_in_mb,我设置为 1,没有效果.

Using spark.cassandra.input.split.size, which fails saying it isn't a valid option, and a similar option is spark.cassandra.input.split.size_in_mb, which I set to 1, with no effect.

编辑

基于这个的回答,我也尝试了以下方法:

based on this answer, I have also tried the following methods:

  • 设置 spark.storage.memoryFraction 为 0

未将 spark.storage.memoryFraction 设置为零并使用 persistMEMORY_ONLYMEMORY_ONLY_SERMEMORY_AND_DISKMEMORY_AND_DISK_SER.

not set spark.storage.memoryFraction to zero and use persist with MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK and MEMORY_AND_DISK_SER.

版本:

  • Spark:1.4.0

  • Spark: 1.4.0

卡桑德拉:2.1.6

spark-cassandra-connector: 1.4.0-M1

spark-cassandra-connector: 1.4.0-M1

推荐答案

我认为最新的 spark-cassandra-connector 存在问题.参数 spark.cassandra.input.split.size_in_mb 应该有一个默认值 64 MB,在代码中被解释为 64 个字节.这会导致创建过多的分区,spark 无法调度这些分区.尝试将 conf 值设置为

I think there is a issue in the latest spark-cassandra-connector. The parameter spark.cassandra.input.split.size_in_mb is supposed to have a default value of 64 MB which is being interpreted as 64 bytes in the code. This causes too many partitions to be created, which can't be scheduled by spark. Try setting the conf value to

spark.cassandra.input.split.size_in_mb=67108864

这篇关于Apache Spark 无法处理大型 Cassandra 列族的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆