Spark Java PCA:Java堆空间和缺少用于混洗的输出位置 [英] Spark Java PCA: Java Heap Space and Missing output location for shuffle

查看:85
本文介绍了Spark Java PCA:Java堆空间和缺少用于混洗的输出位置的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试在具有 4.827行和40.107列的数据帧上执行PCA,但是我遇到了Java堆空间错误,并且缺少用于混洗的输出位置(根据执行程序上的sdterr文件).该错误发生在PCA的"RowMatrix.scala:122处的treeAggregate"阶段.

I try to do a PCA on a dataframe with 4.827 rows and 40.107 columns but I take a Java heap space error and missing output location for shuffle (according to the sdterr file on executors). The error takes place during the "treeAggregate at RowMatrix.scala:122" stage of PCA.

集群

这是一个具有16个工作程序节点的独立群集,每个节点都有1个具有4个内核和21.504mb内存的执行程序.主节点具有15g的内存,我用"Java -jar -Xmx15g myapp.jar"给出.另外,"spark.sql.shuffle.partitions"分别是192和"spark.driver.maxResultSize".是6克.

It is a standalone cluster with 16 worker nodes, each one having 1 executor with 4 cores and 21.504mb memory. The master node has 15g memory which I give with "Java -jar -Xmx15g myapp.jar". Also "spark.sql.shuffle.partitions" are 192 and "spark.driver.maxResultSize" is 6g.

简化代码

df1.persist (From the Storage Tab in spark UI it says it is 3Gb)
df2=df1.groupby(col1).pivot(col2).mean(col3) (This is a df with 4.827 columns and 40.107 rows)
df2.collectFirstColumnAsList
df3=df1.groupby(col2).pivot(col1).mean(col3) (This is a df with 40.107 columns and 4.827 rows)

-----it hangs here for around 1.5 hours creating metadata for upcoming dataframe-----

df4 = (..Imputer or na.fill on df3..)
df5 = (..VectorAssembler on df4..)
(..PCA on df5 with error Missing output location for shuffle..)
df1.unpersist

我已经看到并尝试了许多解决方案,但没有任何结果.其中:

I have seen and tried many solutions but without any result. Among them:

  1. 将df5或df4重新分区为16、64、192、256、1000、4000(尽管数据看起来不偏斜)
  2. 将spark.sql.shuffle.partitions更改为16、64、192、256、1000、4000
  3. 每个执行器使用1个和2个内核,以便为每个任务分配更多的内存.
  4. 有2个具有2个核心或4个核心的执行程序.
  5. 更改"spark.memory.fraction"至0.8和"spark.memory.storageFraction";到0.4.

总是一样的错误!怎么可能耗尽所有这些记忆?df可能实际上不适合内存吗?如果您需要其他任何信息或打印屏幕,请告诉我.

Always the same error! How is it possible to blow away all this memory?? Is it possible the df actually not fitting in memory? Please let me know if you need any other information or printscreens.

编辑1

我将集群更改为2个spark工作者,每个有1个执行程序,每个spark.sql.shuffle.partitions = 48.每个执行器具有115g和8个内核.下面是我加载文件(2.2Gb),将每一行转换为密集向量并馈入PCA的代码.

I changed the cluster to 2 spark workers with 1 executor each with spark.sql.shuffle.partitions=48. Each executor has 115g and 8 cores. Below is the code where I load the file(2.2Gb), convert each line into a dense vector and feed the PCA.

文件中的每一行都采用这种格式(4.568行,每个行具有40.107个双精度值):

Each row in the file has this format(4.568 rows with 40.107 double values each):

 "[x1,x2,x3,...]"

和代码:

Dataset<Row> df1 = sp.read().format("com.databricks.spark.csv").option("header", "true").load("/home/ubuntu/yolo.csv");
StructType schema2 = new StructType(new StructField[] {
                        new StructField("intensity",new VectorUDT(),false,Metadata.empty())
            });
Dataset<Row> df = df1.map((Row originalrow) -> {
                    String yoho =originalrow.get(0).toString();
                    int sizeyoho=yoho.length();
                    String yohi = yoho.substring(1, sizeyoho-1);
                    String[] yi = yohi.split(",");
                    int s = yi.length;
                    double[] tmplist= new double[s];
                    for(int i=0;i<s;i++){
                        tmplist[i]=Double.parseDouble(yi[i]);
                    }
                    
                    Row newrow = RowFactory.create(Vectors.dense(tmplist));
                    return newrow;
            }, RowEncoder.apply(schema2));
PCAModel pcaexp = new PCA()
                    .setInputCol("intensity")
                    .setOutputCol("pcaFeatures")
                    .setK(2)
                    .fit(df);

我在2个工人之一的stderr上遇到的确切错误是:

The exact error I get on the stderr of one of the 2 workers is:

ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 43)
java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这是SparkUI的阶段"标签:

And this is the Stages Tab of SparkUI:

这是失败的阶段(RowMatrix.scala:122处的TreeAggregate):

And this is the Stage that fails(TreeAggregate at RowMatrix.scala:122):

编辑2

编辑3

我读取了整个文件,但每行仅获取10个值并创建了密集矢量.我仍然遇到相同的错误!我有一个具有235g Ram的大师和3个工人(每个执行者1个具有4个核心)和每个执行者64g Ram.这怎么可能发生?(不要忘记文件的总大小只有2.3Gb!)

I read the whole file but taking only 10 values from each row and creating the dense vector. I still get the same error! I have a master with 235g Ram and 3 workers(1 executor each with 4 cores) and 64g Ram per executor. How could this be happening? (Not forget the total size of the file is only 2.3Gb!)

Dataset<Row> df1 = sp.read().format("com.databricks.spark.csv").option("header", "true").load("/home/ubuntu/yolo.csv");

StructType schema2 = new StructType(new StructField[] {
                        new StructField("intensity",new VectorUDT(),false,Metadata.empty())
            });
Dataset<Row> df = df1.map((Row originalrow) -> {
                    String yoho =originalrow.get(0).toString();
                    int sizeyoho=yoho.length();
                    String yohi = yoho.substring(1, sizeyoho-1);
                    String[] yi = yohi.split(",");//this string array has all 40.107 values
                    int s = yi.length;
                    double[] tmplist= new double[s];
                    for(int i=0;i<10;i++){//I narrow it down to take only the first 10 values of each row
                        tmplist[i]=Double.parseDouble(yi[i]);
                    }
                    Row newrow = RowFactory.create(Vectors.dense(tmplist));
                    return newrow;
            }, RowEncoder.apply(schema2));
      
PCAModel pcaexp = new PCA()
                    .setInputCol("intensity")
                    .setOutputCol("pcaFeatures")
                    .setK(2)
                    .fit(df);

推荐答案

当您的Spark应用程序执行较大的洗牌阶段时,会发生缺少洗牌的输出位置" ,它会尝试重新分配大量的执行者之间的数据,并且集群网络中存在一些问题.

The "Missing output location for shuffle" occurs when your Spark application do big shuffle stages, it tries to reallocate huge amount of data among executors and there are some problems in your cluster network.

Spark说您在某个阶段没有记忆.您正在进行的转换需要不同的阶段,并且它们也占用内存.此外,您首先要持久存储数据帧,并且应该检查存储级别,因为很有可能您正在持久存储在内存中.

Spark says that you don´t have memory in some stage. You are doing transformations that requires different stages and they consume memory too. Besides, you persist the dataframe first, and you should check the storage level, because it is posible that you are persisting in memory.

您正在链接几个Spark广泛的转换:例如,执行第一个枢轴阶段,Spark创建一个阶段并执行随机组合以对您的列进行分组,也许您有 数据偏斜 ,并且执行器消耗的内存比其他执行器多得多,而且其中之一可能会发生错误.

You are chaining several Spark wide transformations: doing the first pivot stage ,for example, Spark creates a stage and performs a shuffle to group for your column and maybe you have data skew and there are executors that consume much more memory than others, and maybe the error can happen in one of them.

除数据帧转换外,PCA估计器还将数据帧转换为RDD,从而增加了更多的内存来计算协方差矩阵,并且可以与未分布的NxN元素的Breeze矩阵的密集表示形式配合使用.>.例如,SVD是用Breeze制造的.这给执行者之一带来很大压力.

Besides the Dataframe transformations, the PCA estimator converts the dataframe to a RDD increasing much more the memory to calculate the covarianze matrix, and it works with dense representations of Breeze matrices of NxN elements which are not distributed. For example, the SVD is made with Breeze. That put a lot of pressure in one of the executors.

也许您可以将生成的数据帧保存在HDFS(或其他格式)中,并在PCA中使用另一个Spark应用程序.

Maybe you can save the resulting dataframe in HDFS(or whatever) and do the PCA another Spark application.

主要问题.您所拥有的是,在de SVD之前,该算法需要计算Grammian矩阵,并且使用来自RDD的treeAggregate.这将创建一个非常大的Double矩阵,该矩阵将发送给驱动程序,并且由于驱动程序内存不足而导致错误.您需要大幅增加驱动程序内存.您遇到了网络错误,如果一位执行者失去了连接,则作业崩溃了,它不会尝试重新执行.

The main problem. that you have is that before de SVD the algorithm needs to compute the Grammian Matrix and it uses a treeAggregate from RDD. This creates a very big Double matrix that will be sent to the driver, and there is the error because your driver hasn´t memory enough. You need to increase dramatically the driver memory. You have networks errors, if one executor losses the connection the job crashes it doesn´t try to re-execute.

我个人而言,我将尝试直接在驱动程序的Breeze(或Smile)中进行PCA,因为数据集比协方差矩阵小得多,因此请收集RDD字段,并手动进行Float表示.

Personally, I would try to do the PCA directly in Breeze(or Smile) in the driver, I mean, collect the RDD field because the dataset is quite smaller than the covarianze matrix and do manually with a Float representation.

仅使用Breeze而不是Spark或TreeAgregation来计算PCA的代码:

Code to compute the PCA only with Breeze, neither Spark nor TreeAgregation:

import breeze.linalg._
import breeze.linalg.svd._

object PCACode {
  
  def mean(v: Vector[Double]): Double = v.valuesIterator.sum / v.size

  def zeroMean(m: DenseMatrix[Double]): DenseMatrix[Double] = {
    val copy = m.copy
    for (c <- 0 until m.cols) {
      val col = copy(::, c)
      val colMean = mean(col)
      col -= colMean
    }
    copy
  }

  def pca(data: DenseMatrix[Double], components: Int): DenseMatrix[Double] = {
    val d = zeroMean(data)
    val SVD(_, _, v) = svd(d.t)
    val model = v(0 until components, ::)
    val filter = model.t * model
    filter * d
  }
  
  def main(args: Array[String]) : Unit = {
    val df : DataFrame = ???

    /** Collect the data and do the processing. Convert string to double, etc **/
    val data: Array[mutable.WrappedArray[Double]] =
      df.rdd.map(row => (row.getAs[mutable.WrappedArray[Double]](0))).collect()

    /** Once you have the Array, create the matrix and do the PCA **/
    val matrix = DenseMatrix(data.toSeq:_*)
    val pcaRes = pca(matrix, 2)

    println("result pca \n" + pcaRes)
  }
}

此代码将在驱动程序中执行PCA,检查内存.如果它崩溃了,则可能是Float造成的.

This code will do the PCA in the driver, check the memory. If it crashes it could be do with a Float precission.

这篇关于Spark Java PCA:Java堆空间和缺少用于混洗的输出位置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆