Spark中非常大的RDD [Vector]的Statistics.corr导致达到生成的代码限制:如何解决? [英] Statistics.corr of very large RDD[Vector] in Spark Causes Generated Code Limits to Be Reached: How to Fix?

查看:38
本文介绍了Spark中非常大的RDD [Vector]的Statistics.corr导致达到生成的代码限制:如何解决?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框,它通过执行类似的操作来创建任意数量的行:

I have a data frame with an arbitrarily large number of rows created by doing something similar to:

// pivot data to wide format
val wide = df.groupBy("id").pivot("ip").sum("msgs")

// drop columns and fill in null values
val dfmat = wide.drop("id").na.fill(0)
val dimnames = dfmat.columns

我不知道会有多少个不同的"ip" .然后,我尝试获取 dfmat 的每一行,并创建一个 RDD [Vector] 对象,以与 org.apache.spark.mllib.Statistics一起使用.corr .为此,我正在执行以下操作并遇到错误:

I have no idea how many different "ip"'s there will be. I'm then trying to take each row of dfmat and create an RDD[Vector] object for use with the org.apache.spark.mllib.Statistics.corr. To do that I'm doing the below and running into errors:

// try a different mapping
val mat = dfmat.rdd.map(row => Vectors.parse(row.mkString("[",",","]")))

// create correlation matrix
val correlMatrix: Matrix = Statistics.corr(mat, "pearson")

此功能适用于小型数据集(100万条或更少的记录),但是在对完整数据集进行操作时会失败.我还获得了非常大的日志记录,其中包含奇怪的记录,例如:

This works find for small datasets (1 million or less records), but fails when operating on the full dataset. I also get really, really big log records with weird records like:

/* 125222 */     this.value_8326 = -1L;
/* 125223 */     this.isNull_8327 = true;
/* 125224 */     this.value_8327 = -1L;
/* 125225 */     this.isNull_8328 = true;
/* 125226 */     this.value_8328 = -1L;
/* 125227 */     this.isNull_8329 = true;
/* 125228 */     this.value_8329 = -1L;
/* 125229 */     this.isNull_8330 = true;
/* 125230 */     this.value_8330 = -1L;
/* 125231 */     this.isNull_8331 = true;
/* 125232 */     this.value_8331 = -1L;
/* 125233 */     this.isNull_8332 = true;
/* 125234 */     this.value_8332 = -1L;
/* 125235 */     this.isNull_8333 = true;
/* 125236 */     this.value_8333 = -1L;
/* 125237 */   }
/* 125238 */
/* 125239 */   public org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
/* 125240 */     mutableRow = row;
/* 125241 */     return this;
/* 125242 */   }
/* 125243 */
/* 125244 */   /* Provide immutable access to the last projected row. */
/* 125245 */   public InternalRow currentValue() {
/* 125246 */     return (InternalRow) mutableRow;
/* 125247 */   }
/* 125248 */
/* 125249 */   public java.lang.Object apply(java.lang.Object _i) {
/* 125250 */     InternalRow i = (InternalRow) _i;
/* 125251 */     apply16668_0(i);
/* 125252 */     apply16668_1(i);
/* 125253 */     apply16668_2(i);
/* 125254 */     apply16668_3(i);
/* 125255 */     apply16668_4(i);
/* 125256 */     apply16668_5(i);
/* 125257 */     apply16668_6(i);
/* 125258 */     apply16668_7(i); 
/* 125259 */     apply16668_8(i);
/* 125260 */     apply16668_9(i);
/* 125261 */     apply16668_10(i);
/* 125262 */     apply16668_11(i);
/* 125263 */     apply16668_12(i);
/* 125264 */     apply16668_13(i);
/* 125265 */     apply16668_14(i);
/* 125266 */     apply16668_15(i);
/* 125267 */     apply16668_16(i);
/* 125268 */     apply16668_17(i);
/* 125269 */     apply16668_18(i);
/* 125270 */     // copy all the results into MutableRow
/* 125271 */     apply16669_0(i); 
/* 125272 */     apply16669_1(i);
/* 125273 */     apply16669_2(i);
/* 125274 */     apply16669_3(i);
/* 125275 */     apply16669_4(i);
/* 125276 */     apply16669_5(i);
/* 125277 */     apply16669_6(i);
/* 125278 */     apply16669_7(i);
/* 125279 */     apply16669_8(i);
/* 125280 */     apply16669_9(i);
/* 125281 */     apply16669_10(i);
/* 125282 */     apply16669_11(i);
/* 125283 */     apply16669_12(i);
/* 125284 */     apply16669_13(i);
/* 125285 */     apply16669_14(i);
/* 125286 */     apply16669_15(i);
/* 125287 */     apply16669_16(i);
/* 125288 */     apply16669_17(i);
/* 125289 */     apply16669_18(i);
/* 125290 */     apply16669_19(i);
/* 125291 */     apply16669_20(i);
/* 125292 */     apply16669_21(i);
/* 125293 */     apply16669_22(i);
/* 125294 */     apply16669_23(i);
/* 125295 */     return mutableRow;
/* 125296 */   }
/* 125297 */ }
/* 125298 */

最后:

at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:555)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:575)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:572)
    at org.spark-project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at org.spark-project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
    ... 31 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Lorg/apache/spark/sql/catalyst/expressions/Expression;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
    at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
    at org.codehaus.janino.CodeContext.write(CodeContext.java:854)
    at org.codehaus.janino.CodeContext.writeShort(CodeContext.java:959)
    at org.codehaus.janino.UnitCompiler.writeConstantFieldrefInfo(UnitCompiler.java:10279)
    at org.codehaus.janino.UnitCompiler.putfield(UnitCompiler.java:9956)
    at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5086)
    at org.codehaus.janino.UnitCompiler.access$11800(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$15.visitFieldAccess(UnitCompiler.java:5062)
    at org.codehaus.janino.Java$FieldAccess.accept(Java.java:3235)
    at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
    at org.codehaus.janino.UnitCompiler.compileSet2(UnitCompiler.java:5095)
    at org.codehaus.janino.UnitCompiler.access$11900(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$15.visitFieldAccessExpression(UnitCompiler.java:5063)
    at org.codehaus.janino.Java$FieldAccessExpression.accept(Java.java:3563)
    at org.codehaus.janino.UnitCompiler.compileSet(UnitCompiler.java:5070)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2675)
    at org.codehaus.janino.UnitCompiler.access$4500(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$7.visitAssignment(UnitCompiler.java:2619)
    at org.codehaus.janino.Java$Assignment.accept(Java.java:3405)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2654)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1643)
    at org.codehaus.janino.UnitCompiler.access$1100(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$4.visitExpressionStatement(UnitCompiler.java:936)
    at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2097)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:958)
    at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1007)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:2293)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:518)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:658)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:662)
    at org.codehaus.janino.UnitCompiler.access$600(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:350)
    at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1035)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
    at org.codehaus.janino.UnitCompiler.compileDeclaredMemberTypes(UnitCompiler.java:769)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:532)
    at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
    at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:185)
    at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:347)
    at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1139)
    at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:354)
    at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:322)
    at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:383)
    at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:315)
    at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:233)
    at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:192)
    at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:84)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:550)
    ... 35 more

这看起来像是由自动代码生成引起的错误.我不太确定发生了什么.任何有关如何调试或如何以不同方式执行此类操作的想法都应受到赞赏.如果没有其他合适的解决方案可以执行相同的操作,那么如何将自动生成的代码的大小减小为小于约束呢?我可以更改约束吗?

Which looks like an error caused by automatic code generation. I'm not quite sure what is going on though. Any ideas on how to debug or how to do such a thing in a different way are appreciated. If there is no other suitable solution to do the same thing, then how do I decrease the size of the automatically generated code to be smaller than the constraint? Can I change the constraint?

谢谢

推荐答案

为什么完全 pivot ?这是昂贵且极其低效的操作.只需根据您已有的数据创建一个矩阵即可.

Why pivot at all? It is an expensive and extremely inefficient operation. Just create a matrix from a data you already have.

Firs可汇总您的数据:

Firs lets aggregate your data:

val cols = Seq("id", "ip")

val aggregated = df.groupBy(cols.map(col(_)): _*).agg(sum($"msgs").alias("msgs"))

索引必填列:

import org.apache.spark.ml.feature.StringIndexer

val cols = Seq("id", "ip")

val indexers = cols.map(c => 
  new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx").fit(aggregated)
)

val indexed = indexers.foldLeft(aggregated)((d, t) => t.transform(d)).select(
  cols.map(c => col(s"${c}_idx").cast("long")) :+ $"msgs".cast("double"): _*
)

创建矩阵:

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} 
import org.apache.spark.sql.Row

val rows = new CoordinateMatrix(
  indexed.map{case Row(i: Long, j: Long, v: Double) => MatrixEntry(i, j, v)}
).toRowMatrix.rows

Statistics.corr(rows, "pearson")

这篇关于Spark中非常大的RDD [Vector]的Statistics.corr导致达到生成的代码限制:如何解决?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆