在缓存星火 [英] Caching in Spark

查看:147
本文介绍了在缓存星火的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一个函数的定义来变换RDD。因此,该功能在RDD每个元素调用一次。

A function is defined to transform an RDD. Therefore, the function is called once for each element in the RDD.

该功能需要调用外部Web服务来查找参考数据,作为参数传递数据从RDD当前元素。

The function needs to call an external web service to look up reference data, passing as a parameter data from the current element in the RDD.

两个问题:


  1. 有星火之内发出Web服务调用的一个问题?

  1. Is there an issue with issuing a web service call within Spark?

需要从web服务的数据被缓存。什么是持有(随后参考)缓存数据的最佳方式?简单的方法是将保持高速缓存与包含被传递到RDD函数斯卡拉类集中。这将是有效的,或者是有火花缓存?

The data from the web service needs to be cached. What is the best way to hold (and subsequently reference) the cached data? The simple way would be to hold the cache in a collection with the Scala class which contains the function being passed to the RDD. Would this be efficient, or is there a better approach for caching in Spark?

感谢

推荐答案

有没有真正的缓存(在你的意思的意义上)的机制。似乎是最好的办法是把这个任务分为两个阶段:

There isn't really any mechanism for "caching" (in the sense that you mean). Seems like the best approach would be to split this task into two phases:


  1. 获得的不同的的钥匙,其中必须为每个键
  2. 一旦访问外部查找,并执行查找
  3. 使用这种映射在RDD执行每条记录查找

  1. Get the distinct "keys" by which you must access the external lookup, and perform the lookup once for each key
  2. Use this mapping to perform the lookup for each record in the RDD

我假设有将可能为多条记录访问相同的查找键(否则缓存不会有什么价值呢),所以在执行外部呼叫为的不同的的键快得多。

I'm assuming there would potentially be many records accessing the same lookup key (otherwise "caching" won't be of any value anyway), so performing the external calls for the distinct keys is substantially faster.

你应该如何实现这个


  • 如果您知道这组不同的键为足够小,以适应您的驾驶机器的内存:

  • If you know this set of distinct keys is small enough to fit into your driver machine's memory:


  • 您的数据映射到由你想缓存这些获取的值不同的,然后收集它,例如: VAL键= inputRdd.map(/ *获取密钥* /)不同的()收集()

  • 执行对驾驶员侧取(不使用星火)

  • 使用生成的地图[关键,FetchedValues​​] 在你原来的RDD任何改造 - 它会被序列化并送到每个工人在那里你可以执行抬头。例如,假设输入具有记录该 foreignId 字段是查找键:

  • map your data into the distinct keys by which you'd want to cache these fetched values, and collect it, e.g. : val keys = inputRdd.map(/* get key */).distinct().collect()
  • perform the fetching on driver-side (not using Spark)
  • use the resulting Map[Key, FetchedValues] in any transformation on your original RDD - it will be serialized and sent to each worker where you can perform the lookup. For example, assuming the input has records for which the foreignId field is the lookup key:

val keys = inputRdd.map(record => record.foreignId).distinct().collect()
val lookupTable = keys.map(k => (k, fetchValue(k))).asMap
val withValues = inputRdd.map(record => (record, lookupTable(record.foreignId)))


  • 另外 - 如果此映射为大(但仍可以容纳驾驶员记忆),则可以广播它,你在RDD使用前改造 - 看的广播变量在星火的编程的指南

  • Alternatively - if this map is large (but still can fit in driver memory), you can broadcast it before you use it in RDD transformation - see Broadcast Variables in Spark's Programming Guide

    ,否则(如果此映射可能过大) - 你需要使用加入如果你想将数据保存在集群,但是仍然获取同一个元素切莫两次:<​​/ p>

    Otherwise (if this map might be too large) - you'll need to use join if you want keep data in the cluster, but to still refrain from fetching the same element twice:

    val byKeyRdd = inputRdd.keyBy(record => record.foreignId)
    val lookupTableRdd = byKeyRdd
      .keys()
      .distinct()
      .map(k => (k, fetchValue(k))) // this time fetchValue is done in cluster - concurrently for different values
    val withValues = byKeyRdd.join(lookupTableRdd)
    


  • 这篇关于在缓存星火的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆