如何将DStream与非流文件联接? [英] How to join a DStream with a non-stream file?

查看:72
本文介绍了如何将DStream与非流文件联接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将DStream中的每个RDD与一个非流式的,不变的参考文件连接在一起.这是我的代码:

I'd like to join every RDD in a DStream with a non-streaming, unchanging reference file. Here is my code:

val sparkConf = new SparkConf().setAppName("LogCounter") 
val ssc =  new StreamingContext(sparkConf, Seconds(2)) 

val sc = new SparkContext() 
val geoData = sc.textFile("data/geoRegion.csv") 
            .map(_.split(',')) 
            .map(line => (line(0), (line(1),line(2),line(3),line(4)))) 

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap 
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 

val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details removed for brevity 
val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData)) 

我遇到很多很多错误,最常见的是:

I'm getting many, many errors, the most common being:

14/11/19 19:58:23 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
java.io.FileNotFoundException: http://10.102.71.92:40764/broadcast_1

我认为我应该广播geoData而不是在每个任务中读取它(这是一个100MB的文件),但是我不确定第一次放置geoData的代码应该放在哪里.

I think I should be broadcasting geoData instead of reading it in with each task (it's a 100MB file), but I'm not sure where to put the code that initializes geoData the first time.

我也不知道geoData是否被正确定义(也许应该使用ssc而不是sc?).我看过的文档仅列出了转换和联接,但没有显示静态文件的创建方式.

Also I'm not sure if geoData is even defined correctly (maybe it should use ssc instead of sc?). The documentation I've seen just lists the transform and join but doesn't show how the static file was created.

关于如何广播geoData并将其加入每个流式RDD的任何想法?

Any ideas on how to broadcast geoData and then join it to each streaming RDD?

推荐答案

  • FileNotFound异常:
    • FileNotFound Exception:
    • geoData文本文件从提供的位置("data/geroRegion.csv")加载到所有工作进程上.该文件很可能仅在驱动程序中可用,因此工作人员无法加载该文件,从而引发文件未找到异常.

      The geoData textFile is loaded on all workers from the provided location ("data/geroRegion.csv"). It's most probably that this file in only available in the driver and therefore the workers cannot load it, throwing a file not found exception.

      • 广播变量:

      广播变量在驱动程序上定义,并通过解开广播容器以获取内容而在工作程序上使用. 这意味着由广播变量包含的数据应在定义作业之前由驱动程序加载.

      Broadcast variables are defined on the driver and used on the workers by unwrapping the broadcast container to get the content. This means that the data contained by the broadcast variable should be loaded by the driver before at the time the job is defined.

      在这种情况下,这可能会解决两个问题:假设geoData.csv文件位于驱动程序节点中,它将允许在驱动程序上正确加载此数据并有效地在整个群集中传播.

      This might solve two problems in this case: Assuming that the geoData.csv file is located in the driver node, it will allow proper loading of this data on the driver and an efficient spread over the cluster.

      在上面的代码中,将geoData加载替换为本地文件读取版本:

      In the code above, replace the geoData loading with a local file reading version:

      val geoData = Source.fromFile("data/geoRegion.csv").getLines 
                  .map(_.split(',')) 
                  .map(line => (line(0), (line(1),line(2),line(3),line(4)))).toMap 
      
      val geoDataBC = sc.broadcast(geoData)
      

      要使用它,请在闭包内访问广播内容.请注意,您将可以访问以前包装在广播变量中的地图:它是一个简单的对象,而不是RDD,因此在这种情况下,您不能使用join合并两个数据集.您可以改用flatMap:

      To use it, you access the broadcast contents within a closure. Note that you will get access to the map previously wrapped in the broadcast variable: it's a simple object, not an RDD, so in this case you cannot use join to merge the two datasets. You could use flatMap instead:

      val vdpJoinedGeo = goodIPsFltrBI.flatMap{ip => geoDataBC.value.get(ip).map(data=> (ip,data)}
      

      这篇关于如何将DStream与非流文件联接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆