如何使用 PySpark 并行运行独立转换? [英] How to run independent transformations in parallel using PySpark?

查看:26
本文介绍了如何使用 PySpark 并行运行独立转换?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 PySpark 在单个 RDD 上并行运行 2 个执行完全独立转换的函数.有哪些方法可以做到这一点?

I am trying to run 2 functions doing completely independent transformations on a single RDD in parallel using PySpark. What are some methods to do the same?

def doXTransforms(sampleRDD):
    (X transforms)

def doYTransforms(sampleRDD):
    (Y Transforms)

if __name__ == "__main__":
    sc = SparkContext(appName="parallelTransforms")
    sqlContext = SQLContext(sc)
    hive_context = HiveContext(sc)

    rows_rdd = hive_context.sql("select * from tables.X_table")

    p1 = Process(target=doXTransforms , args=(rows_rdd,))
    p1.start()
    p2 = Process(target=doYTransforms, args=(rows_rdd,))  
    p2.start()
    p1.join()
    p2.join()
    sc.stop()

这行不通,我现在明白这行不通.但是有没有其他方法可以使这项工作?具体有没有python-spark特定的解决方案?

This does not work and I now understand this will not work. But is there any alternative way to make this work? Specifically are there any python-spark specific solutions?

推荐答案

只需使用线程并确保集群有足够的资源同时处理两个任务.

Just use threads and make sure that cluster have enough resources to process both tasks at the same time.

from threading import Thread
import time

def process(rdd, f):
    def delay(x):
        time.sleep(1)
        return f(x)
    return rdd.map(delay).sum()


rdd = sc.parallelize(range(100), int(sc.defaultParallelism / 2))

t1 = Thread(target=process, args=(rdd, lambda x: x * 2))
t2  = Thread(target=process, args=(rdd, lambda x: x + 1))
t1.start(); t2.start()

可以说这在实践中并不常用,但其他方面应该可以正常工作.

Arguably this is not that often useful in practice but otherwise should work just fine.

您可以进一步使用应用内调度 带有 FAIR 调度器和调度器池,可以更好地控制执行策略.

You can further use in-application scheduling with FAIR scheduler and scheduler pools for a better control over execution strategy.

您也可以尝试 pyspark-asyncactions(免责声明 -这个答案的作者也是包的作者)它提供了一组围绕 Spark API 和 concurrent.futures 的包装器:

You can also try pyspark-asyncactions (disclaimer - the author of this answer is also the author of the package) which provides a set of wrappers around Spark API and concurrent.futures:

import asyncactions
import concurrent.futures

f1 = rdd.filter(lambda x: x % 3 == 0).countAsync()
f2 = rdd.filter(lambda x: x % 11 == 0).countAsync()

[x.result() for x in concurrent.futures.as_completed([f1, f2])]

这篇关于如何使用 PySpark 并行运行独立转换?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆