广播变量不分区的Apache星火里面展示 [英] Broadcast Variables not showing inside Partitions Apache Spark

查看:110
本文介绍了广播变量不分区的Apache星火里面展示的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景和问题:
我想两个属性添加到基于查找表的值JSON对象,并插入JSON来蒙戈DB。我已经播出变量持有查找表。不过,我不能够访问它里面foreachPartition,你可以在code看到的。它没有给我任何错误,但根本不显示任何内容。同时,由于它我不能插入JSON来蒙戈DB。我找不到任何解释此行为。任何解释或解决,使其工作多,AP preciated。

下面是我的全code:

 对象ProcessMicroBatchStreams {
VAL calculateDistance = {UDF
 (纬度:字符串,LON:字符串)=>
 GeoHash.getDistance(lat.toDouble,lon.toDouble)}
 VAL DB_NAME =IRT
 VAL COLLECTION_NAME =sensordata
 VAL记录=阵列[字符串]()高清主(参数:数组[字符串]):单位= {
  如果(args.length℃,){
  通信System.err.println(用法:ProcessMicroBatchStreams<主>< input_directory>中)
  System.exit(1)
}
VAL的conf =新SparkConf()
  .setMaster(本地[*])
  .setAppName(this.getClass.getCanonicalName)
  .SET(spark.hadoop.validateOutputSpecs,假)
/*.set(\"spark.executor.instances,3)
.SET(spark.executor.memory,18克)
.SET(spark.executor.cores,9)
.SET(spark.task.cpus,1)
.SET(spark.driver.memory,10克)* /VAL SC =新SparkContext(CONF)
VAL SSC =新的StreamingContext(SC,秒(60))
VAL SQC =新SQLContext(SC)
VAL gpsLookUpTable = MapInput.cacheMappingTables(SC,SQC).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
VAL broadcastTable = sc.broadcast(gpsLookUpTable)
ssc.textFileStream(HDFS://本地主机:9000 / inputDirectory /)
  .foreachRDD {RDD = GT;
  //broadcastTable.value.show()//我可以在这里获得广播值
  如果(!rdd.partitions.isEmpty){
    VAL partitionedRDD = rdd.repartition(4)
    partitionedRDD.foreachPartition {
      分区=>
        的println(内部分区)
        broadcastTable.value.show()//我不能在这里访问广播值
        partition.foreach {
          排=>
            VAL项目= row.split(\\ n)
            items.foreach {项目=>
              VAL mongoColl = MongoClient()(DB_NAME)(COLLECTION_NAME)
              VAL的JSONObject =新的JSONObject(项目)
              VAL纬度= jsonObject.getDouble(Constants.LATITUDE)
              VAL经度= jsonObject.getDouble(Constants.LONGITUDE)              //广播值不会在这里显示
              //然而,没有示出错误
              //我不能插入值到蒙戈DB
              VAL selectedRow = broadcastTable.value
                .filter(地理code LIKE'+ GeoHash.subString(纬度,经度)+%')
                .withColumn(距离,calculateDistance(COL(纬度),列(经度)))
                .orderBy(距离)
                。选择(Constants.TRACK_KM,Constants.TRACK_NAME)。取(1)
              如果(selectedRow.length!= 0){
                jsonObject.put(Constants.TRACK_KM,selectedRow(0)获得(0))
                jsonObject.put(Constants.TRACK_NAME,selectedRow(0)获得(1))
              }
              其他{
                jsonObject.put(Constants.TRACK_KM,空)
                jsonObject.put(Constants.TRACK_NAME,空)
              }
              VAL纪录= JSON.parse(jsonObject.toString())。asInstanceOf [DBOBJECT]
              mongoColl.insert(记录)
            }
        }
    }
  }
}
sys.addShutdownHook {
  ssc.stop(真,真)
}ssc.start()
ssc.awaitTermination()
}
}


解决方案

它看起来像你想的广播RDD。尝试是这样的:

  broadCastVal = gpsLookUpTable.collect
broadCastTable = sc.broadcast(broadCastVal)

您应该能够得到你期待的价值。

Scenario and Problem: I want to add two attributes to JSON object based on the look up table values and insert the JSON to Mongo DB. I have broadcast variable which holds look up table. However, i am not being able to access it inside foreachPartition as you can see in the code. It does not give me any error but simply does not display anything. Also, because of it i cant insert JSON to Mongo DB. I cant find any explanation to this behaviour. Any explanation or work around to make it work is much appreciated.

Here is my full code:

object ProcessMicroBatchStreams {
val calculateDistance = udf { 
 (lat: String, lon: String) =>      
 GeoHash.getDistance(lat.toDouble, lon.toDouble) }
 val DB_NAME = "IRT"
 val COLLECTION_NAME = "sensordata"
 val records = Array[String]()

def main(args: Array[String]): Unit = {
  if (args.length < 0) {
  System.err.println("Usage: ProcessMicroBatchStreams <master> <input_directory>")
  System.exit(1)
}
val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName(this.getClass.getCanonicalName)
  .set("spark.hadoop.validateOutputSpecs", "false")
/*.set("spark.executor.instances", "3")
.set("spark.executor.memory", "18g")
.set("spark.executor.cores", "9")
.set("spark.task.cpus", "1")
.set("spark.driver.memory", "10g")*/

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(60))
val sqc = new SQLContext(sc)
val gpsLookUpTable = MapInput.cacheMappingTables(sc, sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)


ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
  .foreachRDD { rdd =>
  //broadcastTable.value.show() // I can access broadcast value here
  if (!rdd.partitions.isEmpty) {
    val partitionedRDD = rdd.repartition(4)
    partitionedRDD.foreachPartition {
      partition =>
        println("Inside Partition")
        broadcastTable.value.show() // I cannot access broadcast value here
        partition.foreach {
          row =>
            val items = row.split("\n")
            items.foreach { item =>
              val mongoColl = MongoClient()(DB_NAME)(COLLECTION_NAME)
              val jsonObject = new JSONObject(item)
              val latitude = jsonObject.getDouble(Constants.LATITUDE)
              val longitude = jsonObject.getDouble(Constants.LONGITUDE)

              // The broadcast value is not being shown here
              // However, there is no error shown
              // I cannot insert the value into Mongo DB
              val selectedRow = broadcastTable.value
                .filter("geoCode LIKE '" + GeoHash.subString(latitude, longitude) + "%'")
                .withColumn("Distance", calculateDistance(col("Lat"), col("Lon")))
                .orderBy("Distance")
                .select(Constants.TRACK_KM, Constants.TRACK_NAME).take(1)
              if (selectedRow.length != 0) {
                jsonObject.put(Constants.TRACK_KM, selectedRow(0).get(0))
                jsonObject.put(Constants.TRACK_NAME, selectedRow(0).get(1))
              }
              else {
                jsonObject.put(Constants.TRACK_KM, "NULL")
                jsonObject.put(Constants.TRACK_NAME, "NULL")
              }
              val record = JSON.parse(jsonObject.toString()).asInstanceOf[DBObject]
              mongoColl.insert(record)
            }
        }
    }
  }
}
sys.addShutdownHook {
  ssc.stop(true, true)
}

ssc.start()
ssc.awaitTermination()
}
}

解决方案

It looks like you're trying to broadcast an RDD. Try something like this:

broadCastVal = gpsLookUpTable.collect
broadCastTable = sc.broadcast(broadCastVal)

You should be able to get the value you're expecting.

这篇关于广播变量不分区的Apache星火里面展示的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆