AWS EMR:Pyspark:Rdd:mappartitions:在搜索时找不到有效的 SPARK_HOME:Spark 闭包 [英] AWS EMR: Pyspark: Rdd: mappartitions: Could not find valid SPARK_HOME while searching: Spark closures

查看:21
本文介绍了AWS EMR:Pyspark:Rdd:mappartitions:在搜索时找不到有效的 SPARK_HOME:Spark 闭包的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 pyspark 作业,它在本地运行时没有任何问题,但是当它从 aws 集群运行时,它在到达以下代码时卡住了.该作业仅处理 100 条记录.some_function"将数据发布到网站,并在最后返回响应.知道出了什么问题或如何调试?仅供参考:Some_function"在课堂之外,我猜这个问题与[关闭"][1]有关,但不知道如何解决

I'm having a pyspark job which runs without any issues when ran locally, but when It runs from the aws cluster, it gets stuck at the point when it reaches the below code. The job just process 100 records. "some_function" posts data into a website and it returns a response at the end. Any idea what's going wrong or How I can debug this? FYI: "Some_function" is outside of the class, I guess the issue is related to ["closures"][1], But not sure how to fix it

response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()

完整代码如下

def ctgs(entries):
    col1 = entries[0]
    col2 = entries[1]
    col3 = entries[2]

    rec = {
    up_col1 : col1,
    up_col2 : col2,
    up_col3 : col3

    }

    return rec

def some_func1(rec, dict_name, id):
recs{
    rec_list = list(rec)
    seid = id
    }
    headers = "some header"
    attrburl = "www.someurl.com"

    response = requests.post(attrburl, data=json.dumps(rec_list)), headers)

    return response


class Processor:
    def __init(self, sc, arguments):
    self.sc = sc
    self.env = arguments.env
    self.dte = arguments.dte
    self.sendme = arguments.sendme

    def send_them(ext_data, dict_name,id):
        attributes = ext_data.rdd.map(lambda x: ctgs(x['col1'], x['col2'], x[col3]))

        response = attributes.mapPartitions(lambda iter: [some_fuc1(map(lambda x: x, xs), dict_name, id) for xs in partition_all(50, iter)]).collect()

    def extr_data(self):
        ext_data=spark.sql('''select col1, col2, col3 from table_name''')
        send_them(ext_data,dict_name,id)


    def process(self):
        dict_name = { dict_id: '34343-3434-3433-343'}
        id = 'dfdfd-erere-dfd'
        extr_data()



def argument_parsing(args):
    parser.add_argument("--env", required=True)
    parser.add_argument("--dte",  required=True)
    parser.add_argument("--sendme", required=False)
    args = parser.parse_args(args)
    return args


def main(args):

        arguments = argument_parsing(args)

        sc = SparkSession \
            .builder \
            .appName("job_name") \
            .enableHiveSupport() \
            .getOrCreate()

        sc.sparkContext.setLogLevel("ERROR")

        processor = Processor(sc, arguments)
        processor.process()

推荐答案

您说得对,这是闭包/执行程序的问题.

You are correct this is an issue with closures/executors.

mapPartitions 中的代码将在集群中运行在执行器上.运行本地"将掩盖这些类型的错误/错误,因为它将所有功能范围限定为在您的机器上运行的驱动程序.在本地"中没有运行范围问题

Code that is inside mapPartitions will run on executors when in cluster. Running 'local' will obscures these type of bugs/error as it scopes all the functions to driver which is running on your machine. There is not a scope issue running in 'local'

在处理闭包/执行器时有两种类型的问题.您的范围变量不可序列化以及执行程序运行的环境.

There are two types of problems when dealing with closures/executors. Your scoped variables not being serializable and the environment that the executor is running in.

环境检查应该很容易.如果您只是通过 ssh 连接并尝试连接,您能否从您的执行程序之一连接到 URL.(我敢打赌,您正在等待在 DNS 中查找的 URL).事实上,我建议您首先检查 EMR 集群的安全组并查看允许访问哪些节点.

The environment check should be easy. Can you connect to the URL from one of your executors if you just ssh in and try and connect. (My bet is you are waiting on a URL looking up in DNS). In fact I'd suggest you start by checking the security group for the EMR cluster and seeing what nodes are allowed to access.

范围更具挑战性.如果 requests 在全局范围内启动但不可序列化,这可能会导致问题.(您不能将飞行中的连接序列化到数据库/网站.)您可以在 mapPartitions 内启动它,这将解决问题.问题是这通常会立即失败并且并不真正适合您所描述的问题.除非这导致 python 解释器死亡并错误地报告它正在等待,否则我认为这不是问题.

The scope is a little more challenging. If requests is initiated in the global scope but not serializable this could cause an issue. (You can't serialize a inflight connection to a database/website.) You could just initiate it inside of mapPartitions and this would solve the issue. The thing is this would usually fail immediately and doesn't really fit the issue you are describing. Unless this is causing python interpreter to die and falsely report it's waiting, I do not think this is the issue.

这篇关于AWS EMR:Pyspark:Rdd:mappartitions:在搜索时找不到有效的 SPARK_HOME:Spark 闭包的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆