星火流:如何更新广播变量传播到整个集群? [英] Spark Streaming: how to propagate updates to a Broadcast variable to the whole cluster?

查看:845
本文介绍了星火流:如何更新广播变量传播到整个集群?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在星火驱动模块听着卡夫卡的队列,并根据队列的我需要修改广播变量(或关闭)的内容的内容。在这个例子中,这可能是一个字符串。

例如,如果字符串变到达队列中,我需要更新的每个节点广播变量。

我想看到一个模式来做到这一点是干净的,高性能的,或者至少收到我在哪里可以找到一些材料,以更好地了解如何在Spark集群传播的修改输入。


解决方案

广播变量确实传播变量或全封闭的火花集群,使用点对点协议。

从href=\"http://shop.oreilly.com/product/0636920028512.do\" rel=\"nofollow\">学习星火书的


  

广播变量
  只需键入spark.broadcast.Broadcast [T],它封装的值的对象
  类型T.我们可以通过调用值的广播对象访问此值我们
  任务。该值被发送到每一个节点仅一次,使用类似的BitTorrent一个高效,
  沟通机制。


会有什么对性能的影响是你使用的序列化方法(例如:KRYO,一个自定义...):

有是书中的例子:


  

例6-8。国家与查找值的广播在Scala中


  //查找国家为每个呼号
// contactCounts RDD。我们加载呼号数组
// prefixes对国家code来支持这个查询。VAL签署prefixes = sc.broadcast(loadCallSignTable())VAL countryContactCounts = {contactCounts.map
    情况下(符号,计数)=>
        VAL国家= lookupInArray(标志,标志prefixes.value)(国家,计数)
    } .reduceByKey((X,Y)=> X + Y)countryContactCounts.saveAsTextFile(outputDir +/countries.txt)


  

如这些实施例所示,使用广播变量的过程很简单:
  1.通过调用SparkContext.broadcast类型的对象上创建一个广播[T]
  T.任何类型的,只要它是也可序列化工作。
  2.访问其与value属性(在Java或value()方法)值。
  3.变量将被发送到每个节点一次,并且应被视为读
  只(更新将不会传播到其他节点)。


I have a module in the Spark driver listening to a Kafka Queue and depending on the content of the Queue I need to modify the content of a Broadcast variable (or a closure). In this example this could be a String.

For example if the string "change" arrives on the queue, I need to update the Broadcast variable in every node.

I would like to see a pattern to do this that is clean and performant or at least receive an input on where I can find some material to better understand how to propagate modifications in the Spark Cluster.

解决方案

Broadcast variable are indeed propagating variables or whole closures to the spark cluster, using a peer to peer protocol.

From the Learning Spark book:

A broadcast variable is simply an object of type spark.broadcast.Broadcast[T], which wraps a value of type T. We can access this value by calling value on the Broadcast object in our tasks. The value is sent to each node only once, using an efficient, BitTorrent-like communication mechanism.

What will impact on performance is the serialization method you're using (e.g: Kryo, a custom one, ...):

There is an example in the book:

Example 6-8. Country lookup with Broadcast values in Scala

// Look up the countries for each call sign for the
// contactCounts RDD. We load an array of call sign
// prefixes to country code to support this lookup.

val signPrefixes = sc.broadcast(loadCallSignTable())

val countryContactCounts = contactCounts.map {
    case (sign, count) =>
        val country = lookupInArray(sign, signPrefixes.value) (country, count)
    }.reduceByKey((x, y) => x + y)

countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")

As shown in these examples, the process of using broadcast variables is simple: 1. Create a Broadcast[T] by calling SparkContext.broadcast on an object of type T. Any type works as long as it is also Serializable. 2. Access its value with the value property (or value() method in Java). 3. The variable will be sent to each node only once, and should be treated as read- only (updates will not be propagated to other nodes).

这篇关于星火流:如何更新广播变量传播到整个集群?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆