如何将多处理池分配给Spark Worker [英] How to Distribute Multiprocessing Pool to Spark Workers

查看:104
本文介绍了如何将多处理池分配给Spark Worker的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用多处理并行读取100个csv文件(然后分别并行处理它们).这是我在AWS的EMR主节点上托管的Jupyter中运行的代码. (最终将是10万个csv文件,因此需要进行分布式读取).

I am trying to use multiprocessing to read 100 csv files in parallel (and subsequently process them separately in parallel). Here is my code running in Jupyter hosted on my EMR master node in AWS. (Eventually it will be 100k csv files hence the need for distributed reading).

import findspark
import boto3
from multiprocessing.pool import ThreadPool
import logging
import sys
findspark.init()
from pyspark import SparkContext, SparkConf, sql
conf = SparkConf().setMaster("local[*]")
conf.set('spark.scheduler.mode', 'FAIR')
sc = SparkContext.getOrCreate(conf)
spark = sql.SparkSession.builder.master("local[*]").appName("ETL").getOrCreate()
s3 = boto3.resource(...)
bucket = ''
bucketObj = s3.Bucket(bucket)
numNodes = 64
def processTest(key):
    logger.info(key + ' ---- Start\n')
    fLog = spark.read.option("header", "true") \
                         .option("inferSchema", "true") \
                         .csv(buildS3Path(bucket) + key)
    logger.info(key + ' ---- Finish Read\n')
    fLog = renameColumns(NAME_MAP, fLog)
    logger.info(key + ' ---- Finish Rename\n')
    (landLog, flags) = validate(fLog)
    logger.info(key + ' ---- Finish Validation\n')

files = list(bucketObj.objects.filter(Prefix=subfolder))
keys = list(map(lambda obj: obj.key, files))
keys = keys
# files = s3C.list_objects(Bucket=bucket, Prefix=subfolder)['Contents']
p = ThreadPool(numNodes)
p.map(process, keys)

除了只使用主节点,它运行良好.

It runs fine except that it only uses the master node.

蓝线是我的主节点上的CPU使用率. 所有日志都显示我正在一台计算机上运行:

The blue line is the CPU usage on my master node. All the logs show that I'm running on one machine:

 INFO:pyspark:172.31.29.33

我如何使火花分配给工人?

How do I make spark distribute the pool to the workers?

推荐答案

在仔细阅读SparkSession.Builder API文档时,传递给SparkSession.builder.master('xxxx')的字符串是通过以下命令连接到主节点的主机: ://xxxx:7077. 就像user8371915所说的那样,我不必位于独立的本地主机上.相反,此修复程序就像一个魅力:

In a closer read of the SparkSession.Builder API Docs, the string passed to the SparkSession.builder.master('xxxx') is the host in the connection to the master node via: spark://xxxx:7077. Like user8371915 said, I needed to not be on a standalone local master. Instead this fix worked like a charm:

SparkSession.builder.master('yarn')

查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆