突然抛出此RDD缺少一个SparkContext,它在每个代码进入main方法之前都在工作 [英] suddenly throwing This RDD lacks a SparkContext it was working before every code was in main method
问题描述
这是一段有效的代码,但是在我尝试从不同的scala object
It was a working piece of code but suddenly its not working after I tried creating Sparksession
from different scala object
val b = a.filter { x => (!x._2._1.isEmpty) && (!x._2._2.isEmpty) }
val primary_ke = b.map(rec => (rec._1.split(",")(0))).distinct
for (i <- primary_key_distinct) {
b.foreach(println)
}
错误:
ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
即使我撤销了它并且没有使用任何对象,也无法正常工作.
Not working even after I revoked it and I'm not using any objects.
代码已更新:
object try {
def main(args: Array[String]) {
val spark = SparkSession.builder().master("local").appName("50columns3nodes").getOrCreate()
var s = spark.read.csv("/home/hadoopuser/Desktop/input/source.csv").rdd.map(_.mkString(","))
var k = spark.read.csv("/home/hadoopuser/Desktop/input/destination.csv").rdd.map(_.mkString(","))
val source_primary_key = s.map(rec => (rec.split(",")(0), rec))
val destination_primary_key = k.map(rec => (rec.split(",")(0), rec))
val a = source_primary_key.cogroup(destination_primary_key).filter { x => ((x._2._1) != (x._2._2)) }
val b = a.filter { x => (!x._2._1.isEmpty) && (!x._2._2.isEmpty) }
var extra_In_Dest = a.filter(x => x._2._1.isEmpty && !x._2._2.isEmpty).map(rec => (rec._2._2.mkString("")))
var extra_In_Src = a.filter(x => !x._2._1.isEmpty && x._2._2.isEmpty).map(rec => (rec._2._1.mkString("")))
val primary_key_distinct = b.map(rec => (rec._1.split(",")(0))).distinct
for (i <- primary_key_distinct) {
var lengthofarray = 0
println(i)
b.foreach(println)
}
}
}
紧随其后的是输入数据
s=1,david
2,ajay
3,jijo
4,abi
5,surendhar
s=1,david
2,ajay
3,jijo
4,abi
5,surendhar
k=1,david
2,ajay
3,jijoaa
4,abisdsdd
5,surendhar
k=1,david
2,ajay
3,jijoaa
4,abisdsdd
5,surendhar
val a包含{3,(jijo,jijoaa),5(abi,abisdsdd)}
val a contains {3,(jijo,jijoaa),5(abi,abisdsdd)}
推荐答案
如果您仔细阅读了第一条消息
If you read carefully the first message
(1)RDD转换和动作不是由驱动程序调用,而是在其他转换内部;例如,rdd1.map(x => rdd2.values.count()* x)无效,因为无法在rdd1.map转换内部执行值转换和计数操作.有关更多信息,请参阅SPARK-5063.
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
明确指出,动作和转换不能在转换内部执行.
It clearly states that actions and transformations cannot be performed inside a transformation.
primary_key_distinct
是在b
上完成的转化,而b
本身是在a
上进行的转化.而b.foreach(println)
是在primary_key_distinct
primary_key_distinct
is transformation done on b
and b
itself is a transformation done on a
. And b.foreach(println)
is an action done inside transformation of primary_key_distinct
因此,如果您在驱动程序内部收集了b
或primary_key_distinct
,则代码应正确运行
So if you collect b
or primary_key_distinct
inside driver, then the code should run properly
val b = a.filter { x => (!x._2._1.isEmpty) && (!x._2._2.isEmpty) }.collect
或
val primary_key_distinct = b.map(rec => (rec._1.split(",")(0))).distinct.collect
或,如果您未在另一个转换中使用 action ,则代码也应像
or if you don't use action inside another transformation then the code should run properly too as
for (i <- 1 to 2) {
var lengthofarray = 0
println(i)
b.foreach(println)
}
我希望解释清楚.
这篇关于突然抛出此RDD缺少一个SparkContext,它在每个代码进入main方法之前都在工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!