如何在一个sparkcontext从pyspark单独的线程运行多个任务? [英] How to run multiple jobs in one sparkcontext from separate threads in pyspark?

查看:2667
本文介绍了如何在一个sparkcontext从pyspark单独的线程运行多个任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是从火花文档理解,

在一个给定的应用程序的Spark(SparkContext实例),多个并行作业可以,如果他们来自不同的线程提交的同时运行。通过工作,在本节中,我们指的是星火的动作(例如保存,收集),并在需要运行,以评估行动的任何任务。星火的调度完全线程安全的,并支持该用例,使服务于多个请求(例如,用于多用户查询)应用。

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By "job", in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."

我可以发现,在斯卡拉和Java一样的几个例子code。
有人可以给这可怎么使用pyspark实现的例子吗?

I could found few example code of the same in scala and java. Can somebody give an example of how this can be implemented using pyspark?

推荐答案

今天,我在问我同样的。多处理模块提供了线程池,这是产卵你几个线程,因此运行作业并行。首先实例化的功能,然后创建池,然后地图到想要进行迭代的范围。

Today, I was asking me the same. The multiprocessing module offers a ThreadPool, which is spawning a few threads for you and hence runs the jobs in parallel. First instantiate the functions, then create the Pool, and then map it over the range you want to iterate.

在我的情况,我是计算不同的数字中心(超参数调优)这些WSSSE数得到一个好K-均值聚类...就像它在的 MLSpark文档。如果没有进一步的解释,这里有一些细胞从我的IPython的工作表:

In my case, I was calculating these WSSSE numbers for different numbers of centers (hyperparameter tuning) to get a "good" k-means clustering ... just like it is outlined in the MLSpark documentation. Without further explanations, here are some cells from my IPython worksheet:

from pyspark.mllib.clustering import KMeans
import numpy as np

c_points是12dim数组:

c_points are 12dim arrays:

>>> c_points.cache()
>>> c_points.take(3)
[array([ 1, -1,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([-2,  0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  0]),
array([ 7, -1,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0])]

在下面,每个 I 我计算这一WSSSE值并返回它作为一个元组:

In the following, for each i I'm computing this WSSSE value and returning it as a tuple:

def error(point, clusters):
    center = clusters.centers[clusters.predict(point)]
    return np.linalg.norm(point - center)

def calc_wssse(i):
    clusters = KMeans.train(c_points, i, maxIterations=20,
        runs=20, initializationMode="random")
    WSSSE = c_points\
        .map(lambda point: error(point, clusters))\
        .reduce(lambda x, y: x + y)
    return (i, WSSSE)

这里开始有趣的部分:

from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=4)

运行:

wssse_points = tpool.map(calc_wssse, range(1, 30))
wssse_points

给出了:

[(1, 195318509740785.66),
 (2, 77539612257334.33),
 (3, 78254073754531.1),
 ...
]

这篇关于如何在一个sparkcontext从pyspark单独的线程运行多个任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆