在PySpark中处理数据之前如何在所有Spark工作者上运行功能? [英] How to run a function on all Spark workers before processing data in PySpark?

查看:76
本文介绍了在PySpark中处理数据之前如何在所有Spark工作者上运行功能?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用YARN在集群中运行Spark Streaming任务.集群中的每个节点都运行多个Spark Worker.在开始流式传输之前,我想在集群中所有节点上的所有工作程序上执行设置"功能.

I'm running a Spark Streaming task in a cluster using YARN. Each node in the cluster runs multiple spark workers. Before the streaming starts I want to execute a "setup" function on all workers on all nodes in the cluster.

流任务将传入消息分类为垃圾邮件或非垃圾邮件,但在此之前,它需要将最新的经过预先训练的模型从HDFS下载到本地磁盘,例如此伪代码示例:

The streaming task classifies incoming messages as spam or not spam, but before it can do that it needs to download the latest pre-trained models from HDFS to local disk, like this pseudo code example:

def fetch_models():
    if hadoop.version > local.version:
        hadoop.download()

我在SO上看到了以下示例:

I've seen the following examples here on SO:

sc.parallelize().map(fetch_models)

但是在Spark 1.6中,parallelize()需要使用一些数据,例如我现在正在做的这种糟糕的解决方法:

But in Spark 1.6 parallelize() requires some data to be used, like this shitty work-around I'm doing now:

sc.parallelize(range(1, 1000)).map(fetch_models)

只是为了确保该函数在所有工作程序上运行,所以我将范围设置为1000.我也不确切地知道运行时集群中有多少个工作程序.

Just to be fairly sure that the function is run on ALL workers I set the range to 1000. I also don't exactly know how many workers are in the cluster when running.

我已经阅读了编程文档并无情地搜索了Google,但似乎找不到任何方法可以将任何内容分发给所有工人而没有任何数据.

I've read the programming documentation and googled relentlessly but I can't seem to find any way to actually just distribute anything to all workers without any data.

完成此初始化阶段后,流任务照常进行,对来自Kafka的传入数据进行操作.

After this initialization phase is done, the streaming task is as usual, operating on incoming data from Kafka.

我使用模型的方式是运行类似于以下的函数:

The way I'm using the models is by running a function similar to this:

spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())\
    .repartition(spark_partitions)\
    .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))

从理论上讲,我可以在on_partition函数中检查模型是否为最新版本,尽管在每个批次上执行此操作确实很浪费.我想在Spark开始从Kafka检索批处理之前执行此操作,因为从HDFS下载可能需要几分钟...

Theoretically I could check whether or not the models are up to date in the on_partition function, though it would be really wasteful to do this on each batch. I'd like to do it before Spark starts retrieving batches from Kafka, since the downloading from HDFS can take a couple of minutes...

更新:

要明确一点:这不是如何分发文件或如何加载文件的问题,而是关于如何在所有工作程序上运行任意方法而不对任何数据进行操作的问题.

To be clear: it's not an issue on how to distribute the files or how to load them, it's about how to run an arbitrary method on all workers without operating on any data.

要阐明当前实际加载模型的含义:

To clarify what actually loading models means currently:

def on_partition(config, partition):
    if not MyClassifier.is_loaded():
        MyClassifier.load_models(config)

    handle_partition(config, partition)

虽然MyClassifier是这样的:

While MyClassifier is something like this:

class MyClassifier:
    clf = None

    @staticmethod
    def is_loaded():
        return MyClassifier.clf is not None

    @staticmethod
    def load_models(config):
        MyClassifier.clf = load_from_file(config)

静态方法,因为PySpark似乎无法使用非静态方法来序列化类(类的状态与其他工作人员的关系无关).在这里,我们只需要调用一次load_models(),就可以在以后的所有批次上设置MyClassifier.clf.确实不应该为每个批次执行此操作,这是一次性的事情.与使用fetch_models()从HDFS下载文件相同.

Static methods since PySpark doesn't seem to be able to serialize classes with non-static methods (the state of the class is irrelevant with relation to another worker). Here we only have to call load_models() once, and on all future batches MyClassifier.clf will be set. This is something that should really not be done for each batch, it's a one time thing. Same with downloading the files from HDFS using fetch_models().

推荐答案

如果只想在工作计算机之间分发文件,则最简单的方法是使用

If all you want is to distribute a file between worker machines the simplest approach is to use SparkFiles mechanism:

some_path = ...  # local file, a file in DFS, an HTTP, HTTPS or FTP URI.
sc.addFile(some_path)

并使用SparkFiles.get和标准IO工具在工作人员上检索它:

and retrieve it on the workers using SparkFiles.get and standard IO tools:

from pyspark import SparkFiles

with open(SparkFiles.get(some_path)) as fw:
    ... # Do something

如果要确保实际加载了模型,最简单的方法是在模块导入时加载.假设config可用于检索模型路径:

If you want to make sure that model is actually loaded the simplest approach is to load on module import. Assuming config can be used to retrieve model path:

  • model.py:

from pyspark import SparkFiles

config = ...
class MyClassifier:
    clf = None

    @staticmethod
    def is_loaded():
        return MyClassifier.clf is not None

    @staticmethod
    def load_models(config):
        path = SparkFiles.get(config.get("model_file"))
        MyClassifier.clf = load_from_file(path)

# Executed once per interpreter 
MyClassifier.load_models(config)  

  • main.py:

    from pyspark import SparkContext
    
    config = ...
    
    sc = SparkContext("local", "foo")
    
    # Executed before StreamingContext starts
    sc.addFile(config.get("model_file"))
    sc.addPyFile("model.py")
    
    import model
    
    ssc = ...
    stream = ...
    stream.map(model.MyClassifier.do_something).pprint()
    
    ssc.start()
    ssc.awaitTermination()
    

  • 这篇关于在PySpark中处理数据之前如何在所有Spark工作者上运行功能?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆