如何在 PySpark 中处理数据之前在所有 Spark 工作器上运行函数? [英] How to run a function on all Spark workers before processing data in PySpark?

查看:13
本文介绍了如何在 PySpark 中处理数据之前在所有 Spark 工作器上运行函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 YARN 在集群中运行 Spark Streaming 任务.集群中的每个节点都运行多个 spark 工作线程.在流开始之前,我想对集群中所有节点上的所有工作线程执行设置"功能.

I'm running a Spark Streaming task in a cluster using YARN. Each node in the cluster runs multiple spark workers. Before the streaming starts I want to execute a "setup" function on all workers on all nodes in the cluster.

流任务将传入的消息分类为垃圾邮件或非垃圾邮件,但在此之前,它需要将最新的预训练模型从 HDFS 下载到本地磁盘,如以下伪代码示例:

The streaming task classifies incoming messages as spam or not spam, but before it can do that it needs to download the latest pre-trained models from HDFS to local disk, like this pseudo code example:

def fetch_models():
    if hadoop.version > local.version:
        hadoop.download()

我在 SO 上看到了以下示例:

I've seen the following examples here on SO:

sc.parallelize().map(fetch_models)

但是在 Spark 1.6 中 parallelize() 需要使用一些数据,就像我现在正在做的这个糟糕的解决方法:

But in Spark 1.6 parallelize() requires some data to be used, like this shitty work-around I'm doing now:

sc.parallelize(range(1, 1000)).map(fetch_models)

为了相当确定该函数在所有 worker 上运行,我将范围设置为 1000.我也不知道运行时集群中有多少 worker.

Just to be fairly sure that the function is run on ALL workers I set the range to 1000. I also don't exactly know how many workers are in the cluster when running.

我已经阅读了编程文档并坚持不懈地在谷歌上搜索,但我似乎找不到任何方法可以在没有任何数据的情况下将任何内容分发给所有工作人员.

I've read the programming documentation and googled relentlessly but I can't seem to find any way to actually just distribute anything to all workers without any data.

在此初始化阶段完成后,流式任务照常处理来自 Kafka 的传入数据.

After this initialization phase is done, the streaming task is as usual, operating on incoming data from Kafka.

我使用模型的方式是运行一个类似的函数:

The way I'm using the models is by running a function similar to this:

spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())
    .repartition(spark_partitions)
    .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))

理论上我可以在 on_partition 函数中检查模型是否是最新的,尽管在每个批次上这样做真的很浪费.我想在 Spark 开始从 Kafka 检索批次之前完成它,因为从 HDFS 下载可能需要几分钟......

Theoretically I could check whether or not the models are up to date in the on_partition function, though it would be really wasteful to do this on each batch. I'd like to do it before Spark starts retrieving batches from Kafka, since the downloading from HDFS can take a couple of minutes...

更新:

需要明确的是:这不是关于如何分发文件或如何加载文件的问题,而是关于如何在不操作任何数据的情况下对所有工人运行任意方法.

To be clear: it's not an issue on how to distribute the files or how to load them, it's about how to run an arbitrary method on all workers without operating on any data.

澄清当前实际加载模型的含义:

To clarify what actually loading models means currently:

def on_partition(config, partition):
    if not MyClassifier.is_loaded():
        MyClassifier.load_models(config)

    handle_partition(config, partition)

虽然 MyClassifier 是这样的:

While MyClassifier is something like this:

class MyClassifier:
    clf = None

    @staticmethod
    def is_loaded():
        return MyClassifier.clf is not None

    @staticmethod
    def load_models(config):
        MyClassifier.clf = load_from_file(config)

静态方法,因为 PySpark 似乎无法使用非静态方法序列化类(类的状态与另一个工作程序无关).这里我们只需要调用一次 load_models(),并且在以后的所有批次中 MyClassifier.clf 都将被设置.这真的不应该为每批都做,这是一次性的.与使用 fetch_models() 从 HDFS 下载文件相同.

Static methods since PySpark doesn't seem to be able to serialize classes with non-static methods (the state of the class is irrelevant with relation to another worker). Here we only have to call load_models() once, and on all future batches MyClassifier.clf will be set. This is something that should really not be done for each batch, it's a one time thing. Same with downloading the files from HDFS using fetch_models().

推荐答案

如果您只想在工作机器之间分发文件,最简单的方法是使用 SparkFiles 机制:

If all you want is to distribute a file between worker machines the simplest approach is to use SparkFiles mechanism:

some_path = ...  # local file, a file in DFS, an HTTP, HTTPS or FTP URI.
sc.addFile(some_path)

并使用 SparkFiles.get 和标准 IO 工具在工作器上检索它:

and retrieve it on the workers using SparkFiles.get and standard IO tools:

from pyspark import SparkFiles

with open(SparkFiles.get(some_path)) as fw:
    ... # Do something

如果您想确保模型实际加载,最简单的方法是在模块导入时加载.假设 config 可用于检索模型路径:

If you want to make sure that model is actually loaded the simplest approach is to load on module import. Assuming config can be used to retrieve model path:

  • model.py:

from pyspark import SparkFiles

config = ...
class MyClassifier:
    clf = None

    @staticmethod
    def is_loaded():
        return MyClassifier.clf is not None

    @staticmethod
    def load_models(config):
        path = SparkFiles.get(config.get("model_file"))
        MyClassifier.clf = load_from_file(path)

# Executed once per interpreter 
MyClassifier.load_models(config)  

  • main.py:

    from pyspark import SparkContext
    
    config = ...
    
    sc = SparkContext("local", "foo")
    
    # Executed before StreamingContext starts
    sc.addFile(config.get("model_file"))
    sc.addPyFile("model.py")
    
    import model
    
    ssc = ...
    stream = ...
    stream.map(model.MyClassifier.do_something).pprint()
    
    ssc.start()
    ssc.awaitTermination()
    

  • 这篇关于如何在 PySpark 中处理数据之前在所有 Spark 工作器上运行函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆