在PySpark环境中创建缓存的最佳方法 [英] Optimal way of creating a cache in the PySpark environment

查看:54
本文介绍了在PySpark环境中创建缓存的最佳方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark Streaming创建系统以丰富来自Cloudant数据库的传入数据.示例-

I am using Spark Streaming for creating a system to enrich incoming data from a cloudant database. Example -

Incoming Message: {"id" : 123}
Outgoing Message: {"id" : 123, "data": "xxxxxxxxxxxxxxxxxxx"}

我对驱动程序类的代码如下:

My code for the driver class is as follows:

from Sample.Job import EnrichmentJob
from Sample.Job import FunctionJob
import pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession

from kafka import KafkaConsumer, KafkaProducer
import json

class SampleFramework():

    def __init__(self):
        pass

    @staticmethod
    def messageHandler(m):
        return json.loads(m.message)

    @staticmethod
    def processData(rdd):

        if (rdd.isEmpty()):
            print("RDD is Empty")
            return

        # Expand
        expanded_rdd = rdd.mapPartitions(EnrichmentJob.enrich)

        # Score
        scored_rdd = expanded_rdd.map(FunctionJob.function)

        # Publish RDD


    def run(self, ssc):

        self.ssc = ssc

        directKafkaStream = KafkaUtils.createDirectStream(self.ssc, QUEUENAME, \
                                                          {"metadata.broker.list": META, 
                                                          "bootstrap.servers": SERVER}, \
                                                          messageHandler= SampleFramework.messageHandler)

        directKafkaStream.foreachRDD(SampleFramework.processData)

        ssc.start()
        ssc.awaitTermination()

浓缩工作的代码如下:类EnrichmentJob:

Code for the the Enrichment Job is as follows: class EnrichmentJob:

cache = {}

@staticmethod
def enrich(data):

    # Assume that Cloudant Connector using the available config
    cloudantConnector = CloudantConnector(config, config["cloudant"]["host"]["req_db_name"])
    final_data = []
    for row in data:
        id = row["id"]
        if(id not in EnrichmentJob.cache.keys()):
            data = cloudantConnector.getOne({"id": id})
            row["data"] = data
            EnrichmentJob.cache[id]=data
        else:
            data = EnrichmentJob.cache[id]
            row["data"] = data
        final_data.append(row)

    cloudantConnector.close()

    return final_data

我的问题是-是否有某种方式可以维护[1]所有工作人员都可以访问的主存储器上的全局缓存"或"2>"每个工作人员上的本地缓存,以使它们保持持久化在foreachRDD中环境"?

My question is - Is there someway to maintain [1]"a global cache on the main memory that is accessible to all workers" or [2]"local caches on each of the workers such that they remain persisted in the foreachRDD setting"?

我已经探索了以下内容-

I have already explored the following -

  1. 广播变量-在这里,我们采用[1]方式.据我了解,它们是只读且不可变的.我已经签出了这个参考,但它引用了一个示例取消持久化/持久化广播变量.这是个好习惯吗?

  1. Broadcast Variables - Here we go the [1] way. As I understand, they are meant to be read-only and immutable. I have checked out this reference but it cites an example of unpersisting/persisting the broadcasted variable. Is this a good practice?

静态变量-在这里,我们采用[2]方式.所引用的类(在这种情况下为"Enricher")以静态变量字典的形式维护高速缓存.但是事实证明,ForEachRDD函数为每个传入的RDD产生了一个全新的过程,并且删除了先前启动的静态变量.这是上面的代码.

Static Variables - Here we go the [2] way. The class that is being referred to ("Enricher" in this case) maintains a cache in the form of a static variable dictionary. But it turns out that the ForEachRDD function spawns a completely new process for each incoming RDD and this removes the previously initiated static variable. This is the one coded above.

我现在有两种可能的解决方法-

I have two possible solutions right now -

  1. 在文件系统上维护脱机缓存.
  2. 在我的驱动程序节点上完成此扩充任务的整个计算.这将导致整个数据最终存储在驱动程序上并在那里保存.缓存对象将作为映射函数的参数发送到扩充作业.

很明显,第一个看起来比第二个更好,但是我想得出的结论是,在致力于这两个之前,这是唯一的解决方法.任何指针将不胜感激!

Here obviously the first one looks better than the second, but I wish to conclude that these two are the only ways around, before committing to them. Any pointers would be appreciated!

推荐答案

是否有某种方式可以维护[1]主存储器上的全局缓存,所有工作人员都可以访问"

Is there someway to maintain [1]"a global cache on the main memory that is accessible to all workers"

不.没有所有工作人员都可以访问的主内存".每个工作人员都在一个单独的进程中运行,并通过套接字与外部世界进行通信.更不用说在非本地模式下不同物理节点之间的分隔.

No. There is no "main memory" which can be accessed by all workers. Each worker runs in a separate process and communicates with external world with sockets. Not to mention separation between different physical nodes in non-local mode.

可以使用一些技术来实现具有内存映射数据的工作区范围的缓存(使用SQLite是最简单的一种),但是需要花费额外的精力来实现正确的方式(避免冲突等).

There are some techniques that can be applied to achieve worker scoped cache with memory mapped data (using SQLite being the simplest one) but it takes some additional effort to implement the right way (avoid conflicts and such).

还是[2]每个工作程序上的本地缓存,以便它们在foreachRDD设置中保持不变"?

or [2]"local caches on each of the workers such that they remain persisted in the foreachRDD setting"?

您可以使用标准缓存技术,其范围限于单个工作进程.取决于配置(静态与动态资源分配 spark.python.worker.reuse ),它可能会或可能不会在多个任务和批处理之间保留.

You can use standard caching techniques with scope limited to the individual worker processes. Depending on the configuration (static vs. dynamic resource allocation, spark.python.worker.reuse) it may or may not be preserved between multiple tasks and batches.

请考虑以下简化示例:

  • map_param.py :

from pyspark import AccumulatorParam
from collections import Counter

class CounterParam(AccumulatorParam):
    def zero(self, v: Counter) -> Counter:
        return Counter()

    def addInPlace(self, acc1: Counter, acc2: Counter) -> Counter:
        acc1.update(acc2)
        return acc1

  • my_utils.py :

    from pyspark import Accumulator
    from typing import Hashable
    from collections import Counter
    
    # Dummy cache. In production I would use functools.lru_cache 
    # but it is a bit more painful to show with accumulator
    cached = {} 
    
    def f_cached(x: Hashable, counter: Accumulator) -> Hashable:
        if cached.get(x) is None:
            cached[x] = True
            counter.add(Counter([x]))
        return x
    
    
    def f_uncached(x: Hashable, counter: Accumulator) -> Hashable:
        counter.add(Counter([x]))
        return x
    

  • main.py :

    from pyspark.streaming import StreamingContext
    from pyspark import SparkContext
    
    from counter_param import CounterParam
    import my_utils
    
    from collections import Counter
    
    def main():
        sc = SparkContext("local[1]")
        ssc = StreamingContext(sc, 5)
    
        cnt_cached = sc.accumulator(Counter(), CounterParam())
        cnt_uncached = sc.accumulator(Counter(), CounterParam())
    
        stream = ssc.queueStream([
            # Use single partition to show cache in work
            sc.parallelize(data, 1) for data in
            [[1, 2, 3], [1, 2, 5], [1, 3, 5]]
        ])
    
        stream.foreachRDD(lambda rdd: rdd.foreach(
            lambda x: my_utils.f_cached(x, cnt_cached)))
        stream.foreachRDD(lambda rdd: rdd.foreach(
            lambda x: my_utils.f_uncached(x, cnt_uncached)))
    
        ssc.start()
        ssc.awaitTerminationOrTimeout(15)
        ssc.stop(stopGraceFully=True)
    
        print("Counter cached {0}".format(cnt_cached.value))
        print("Counter uncached {0}".format(cnt_uncached.value))
    
    if __name__ == "__main__":
        main()
    

  • 示例运行:

    bin/spark-submit main.py
    

    Counter cached Counter({1: 1, 2: 1, 3: 1, 5: 1})
    Counter uncached Counter({1: 3, 2: 2, 3: 2, 5: 2})
    

    如您所见,我们获得了预期的结果:

    As you can see we get expected results:

    • 对于缓存的"对象累加器,每个工作进程(分区)的每个唯一键仅更新一次.
    • 对于未缓存的对象,每次发生键更新时,累加器都会更新.

    这篇关于在PySpark环境中创建缓存的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆