空指针异常当试图使用持续表中星火流 [英] Null Pointer Exception When Trying to Use Persisted Table in Spark Streaming
问题描述
我创建gpsLookUpTable开头,并坚持它,这样我就不需要把它一遍又一遍地做映射。然而,当我尝试访问它里面的foreach我得到空指针异常。任何帮助是AP preciated感谢。
下面是code片段:
高清主(参数:数组[字符串]):单位= {VAL的conf =新SparkConf()...VAL SC =新SparkContext(CONF)
VAL SSC =新的StreamingContext(SC,秒(20))
VAL SQC =新SQLContext(SC)//////试图缓存表在这里下使用
VAL gpsLookUpTable = MapInput.cacheMappingTables(SC,SQC).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
//sc.broadcast(gpsLookUpTable)
ssc.textFileStream(HDFS://本地主机:9000 / inputDirectory /)
.foreachRDD {RDD = GT;
如果(!rdd.partitions.isEmpty){VAL allRows = sc.textFile(HDFS://本地主机:9000 / supportFiles / GeoHashLookUpTable)
sqc.read.json(allRows).registerTempTable(GeoHashLookUpTable)
VAL头= rdd.first()。分裂()
VAL rowsWithoutHeader = Utils.dropHeader(RDD)rowsWithoutHeader.foreach {行=>VAL singleRowArray = row.split(,)
singleRowArray.foreach(的println)
(头,singleRowArray).zipped
.foreach {(X,Y)=>
///试图访问坚持表,但得到空指针异常
VAL selectedRow = gpsLookUpTable
.filter(地理code LIKE'+ GeoHash.subString(lattitude,经度)+%')
.withColumn(距离,calculateDistance(COL(纬度),列(经度)))
.orderBy(距离)
。选择(TrackKM,TRACKNAME)。取(1)
如果(selectedRow.length!= 0){
// 做一点事
}
其他{
// 做一点事
}
}
}}}
我假设你是在群集中运行;你的foreach将作为其它节点的封闭运行。的空指针被升高,因为封闭件不具有一个初始化gpsLookUpTable一个节点上运行。你明明没有尝试在播放gpsLookUpTable
// sc.broadcast(gpsLookUpTable)
但是,这需要被绑定到一个变量,基本上是这样:
VAL tableBC = sc.broadcast(gpsLookUpTable)
在的foreach,你将取代这样的:
的foreach {(x,y)=>
///试图访问坚持表,但得到空指针异常
VAL selectedRow = gpsLookUpTable
本:
的foreach {(x,y)=>
///试图访问坚持表,但得到空指针异常
VAL selectedRow = tableBC.value
这有效地让您使用广播值。
I am creating "gpsLookUpTable" at the beginning and persisting it so that i do not need to pull it over and over again to do mapping. However, when i try to access it inside foreach i get null pointer exception. Any help is appreciated thanks.
Below is code snippets:
def main(args: Array[String]): Unit = {
val conf = new SparkConf() ...
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(20))
val sqc = new SQLContext(sc)
//////Trying to cache table here to use it below
val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
//sc.broadcast(gpsLookUpTable)
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
.foreachRDD { rdd =>
if (!rdd.partitions.isEmpty) {
val allRows = sc.textFile("hdfs://localhost:9000/supportFiles/GeoHashLookUpTable")
sqc.read.json(allRows).registerTempTable("GeoHashLookUpTable")
val header = rdd.first().split(",")
val rowsWithoutHeader = Utils.dropHeader(rdd)
rowsWithoutHeader.foreach { row =>
val singleRowArray = row.split(",")
singleRowArray.foreach(println)
(header, singleRowArray).zipped
.foreach { (x, y) =>
///Trying to access persisted table but getting null pointer exception
val selectedRow = gpsLookUpTable
.filter("geoCode LIKE '" + GeoHash.subString(lattitude, longitude) + "%'")
.withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
.orderBy("Distance")
.select("TrackKM", "TrackName").take(1)
if (selectedRow.length != 0) {
// do something
}
else {
// do something
}
}
} }}
I assume you are running in a cluster; your foreach will run as a closure on other nodes. The Nullpointer is raised because that closure runs on a node which doesn't have a initialized gpsLookUpTable. You did obviously try to broadcast gpsLookUpTable in
//sc.broadcast(gpsLookUpTable)
But this need to be bound to a variable, basically like this:
val tableBC = sc.broadcast(gpsLookUpTable)
in foreach, you would replace this:
foreach { (x, y) =>
///Trying to access persisted table but getting null pointer exception
val selectedRow = gpsLookUpTable
with this:
foreach { (x, y) =>
///Trying to access persisted table but getting null pointer exception
val selectedRow = tableBC.value
which effectively give you access to the broadcast value.
这篇关于空指针异常当试图使用持续表中星火流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!