如何从 PySpark 中的不同线程在一个 Sparkcontext 中运行多个作业? [英] How to run multiple jobs in one Sparkcontext from separate threads in PySpark?

查看:35
本文介绍了如何从 PySpark 中的不同线程在一个 Sparkcontext 中运行多个作业?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从关于在应用程序内调度的 Spark 文档中可以理解:

It is understood from Spark documentation about Scheduling Within an Application:

在给定的 Spark 应用程序(SparkContext 实例)中,如果多个并行作业从不同的线程提交,则它们可以同时运行.在本节中,作业"是指 Spark 操作(例如保存、收集)以及需要运行以评估该操作的任何任务.Spark 的调度程序是完全线程安全的,并支持此用例以启用服务多个请求(例如,多个用户的查询)的应用程序."

Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By "job", in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)."

我在 Scala 和 Java 中找到了一些相同的示例代码.有人可以举例说明如何使用 PySpark 实现这一点吗?

I could found few example code of the same in Scala and Java. Can somebody give an example of how this can be implemented using PySpark?

推荐答案

我遇到了同样的问题,所以我创建了一个小型的独立示例.我使用 python 的线程模块创建多个线程并同时提交多个 spark 作业.

I was running into the same issue, so I created a tiny self-contained example. I create multiple threads using python's threading module and submit multiple spark jobs simultaneously.

请注意,默认情况下,spark 将按照先进先出 (FIFO) 的方式运行作业:http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application.在下面的示例中,我将其更改为 FAIR 调度

Note that by default, spark will run the jobs in First-In First-Out (FIFO): http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. In the example below, I change it to FAIR scheduling

# Prereqs:
# set 
# spark.dynamicAllocation.enabled         true
# spark.shuffle.service.enabled           true
  spark.scheduler.mode                    FAIR
# in spark-defaults.conf

import threading
from pyspark import SparkContext, SparkConf

def task(sc, i):
  print sc.parallelize(range(i*10000)).count()

def run_multiple_jobs():
  conf = SparkConf().setMaster('local[*]').setAppName('appname')
  # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
  conf.set('spark.scheduler.mode', 'FAIR')
  sc = SparkContext(conf=conf)
  for i in range(4):
    t = threading.Thread(target=task, args=(sc, i))
    t.start()
    print 'spark task', i, 'has started'


run_multiple_jobs()

输出:

spark task 0 has started
spark task 1 has started
spark task 2 has started
spark task 3 has started
30000
0 
10000
20000

这篇关于如何从 PySpark 中的不同线程在一个 Sparkcontext 中运行多个作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆