Spark集群中RDD映射函数内部的调用函数 [英] Calling function inside RDD map function in Spark cluster
问题描述
我正在测试由我在代码中定义的简单字符串解析器函数,但是其中一个工作节点始终在执行时失败.这是我正在测试的伪代码:
I was testing a simple string parser function defined by me in my code, but one of the worker nodes always fails at execution time. Here is the dummy code that I've been testing:
/* JUST A SIMPLE PARSER TO CLEAN PARENTHESIS */
def parseString(field: String): String = {
val Pattern = "(.*.)".r
field match{
case "null" => "null"
case Pattern(field) => field.replace('(',' ').replace(')',' ').replace(" ", "")
}
}
/* CREATE TWO DISTRIBUTED RDDs TO JOIN THEM */
val emp = sc.parallelize(Seq((1,"jordan",10), (2,"ricky",20), (3,"matt",30), (4,"mince",35), (5,"rhonda",30)), 6)
val dept = sc.parallelize(Seq(("hadoop",10), ("spark",20), ("hive",30), ("sqoop",40)), 6)
val manipulated_emp = emp.keyBy(t => t._3)
val manipulated_dept = dept.keyBy(t => t._2)
val left_outer_join_data = manipulated_emp.leftOuterJoin(manipulated_dept)
/* OUTPUT */
left_outer_join_data.collect.foreach(println)
/*
(30,((3,matt,30),Some((hive,30))))
(30,((5,rhonda,30),Some((hive,30))))
(20,((2,ricky,20),Some((spark,20))))
(10,((1,jordan,10),Some((hadoop,10))))
(35,((4,mince,35),None))
*/
val res = left_outer_join_data
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString))
.collect
res
.map(f => ( f._1, f._2, parseString(f._3)))
.foreach(println)
/* DESIRED OUTPUT */
/*
(3,matt,hive,30)
(5,rhonda,hive,30)
(2,ricky,spark,20)
(1,jordan,hadoop,10)
(4,mince,null)
*/
如果我先在驱动程序中收集 res 的结果,则此代码将起作用.由于这是一个测试,因此这样做没有问题,但是我的实际应用程序将处理数百万行,并且不鼓励在驱动程序中收集结果.因此,如果我执行相同的而没有先收集它,就像这样:
This code works if I collect the results of res in the driver first. Since this is a testing, there is no problem doing that, but my actual application would deal with millions of rows and collecting results in the driver is discouraged. So if I do the same without collecting it first, like this:
val res = left_outer_join_data
.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString))
res
.map(f => ( f._1, f._2, parseString(f._3)))
.foreach(println)
我得到以下信息:
ERROR TaskSetManager: Task 5 in stage 17.0 failed 4 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 17.0 failed 4 times, most recent failure: Lost task 5.3 in stage 17.0 (TID 166, 192.168.28.101, executor 1): java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$
at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:916)
at tele.com.SimcardMsisdn$.main(SimcardMsisdn.scala:249)
at tele.com.SimcardMsisdn.main(SimcardMsisdn.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class tele.com.SimcardMsisdn$
at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
at tele.com.SimcardMsisdn$$anonfun$main$1.apply(SimcardMsisdn.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
为什么Spark无法在节点上执行我的解析器?您能推荐一个解决方案或解决方法吗?
Why Spark fails to execute my parser on the nodes ? Could you please recommend a solution or workaround ?
更新
我找到了解决这个问题的方法(发布在下面),尽管如此,我仍然对此问题感到困惑,也许这是我做错了.
I found the solution to this problem (posted below), nonetheless, I'm still confused about this issue, maybe is something that I'm doing wrong.
推荐答案
好吧,我已经设法通过向工人广播 Pattern 变量来解决此问题:
Well, I've managed to solve it myself by broadcasting the Pattern variable to the workers:
val Pattern = sc.broadcast("(.*.)".r)
并在地图中(而不是在函数中)进行模式匹配,并且不收集给驱动程序:
and doing the pattern matching within the map, not in a function, and without collecting to the driver:
val res = left_outer_join_data.map(f => (f._2._1._1, f._2._1._2, f._2._2.getOrElse("null").toString))
res.map(f => (f._1, f._2, f._3 match {
case "null" => "null"
case Pattern.value(f._3) => f._3.replace('(',' ').replace(')',' ').replace(" ", "")})
)
.foreach(println)
然后我从worker标准输出中获得了所需的输出:
Then I got the desired output from the worker stdout:
(3,matt,hive,30)
(5,rhonda,hive,30)
(2,ricky,spark,20)
(1,jordan,hadoop,10)
(4,mince,null)
这篇关于Spark集群中RDD映射函数内部的调用函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!