为什么RDD.foreach失败并显示"SparkException:此RDD缺少SparkContext"? [英] Why does RDD.foreach fail with "SparkException: This RDD lacks a SparkContext"?

查看:556
本文介绍了为什么RDD.foreach失败并显示"SparkException:此RDD缺少SparkContext"?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据集(作为RDD),我通过使用不同的filter运算符将其划分为4个RDD.

I have a dataset (as an RDD) that I divide into 4 RDDs by using different filter operators.

 val RSet = datasetRdd.
   flatMap(x => RSetForAttr(x, alLevel, hieDict)).
   map(x => (x, 1)).
   reduceByKey((x, y) => x + y)
 val Rp:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("Rp"))
 val Rc:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("Rc"))
 val RpSv:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("RpSv"))
 val RcSv:RDD[(String, Int)] = RSet.filter(x => x._1.split(",")(0).equals("RcSv"))

我将RpRpSV发送到以下函数calculateEntropy:

I sent Rp and RpSV to the following function calculateEntropy:

def calculateEntropy(Rx: RDD[(String, Int)], RxSv: RDD[(String, Int)]): Map[Int, Map[String, Double]] = {
        RxSv.foreach{item => {
               val string = item._1.split(",")
               val t = Rx.filter(x => x._1.split(",")(2).equals(string(2)))

        .
        .
    }
}

我有两个问题:

1-当我对RxSv的操作循环为:

1- When I loop operation on RxSv as:

RxSv.foreach{item=> { ... }}

它会收集分区的所有项目,但我只希望我所在的分区.如果您说该用户map功能,但是我对RDD没有任何更改.

it collects all items of the partitions, but I want to only a partition where i am in. If you said that user map function but I don't change anything on RDD.

因此,当我在具有4个工作程序和一个驱动程序的群集上运行代码时,数据集被划分为4个分区,每个工作程序都运行代码.但是例如,我使用代码中指定的foreach循环.驱动程序从工作人员收集所有数据.

So when I run the code on a cluster with 4 workers and a driver the dataset is divided into 4 partitions and each worker runs the code. But for example i use foreach loop as i specified in the code. Driver collects all data from workers.

2-我在此代码上遇到了问题

2- I have encountered with a problem on this code

val t = Rx.filter(x => x._1.split(",")(2).equals(abc(2)))

错误:

org.apache.spark.SparkException: This RDD lacks a SparkContext.


在以下情况下可能会发生:

(1)驱动程序不调用RDD transformationsactions,但在其他转换内部;
例如,rdd1.map(x => rdd2.values.count() * x)无效,因为不能在rdd1.map transformation内部执行值transformationcount action.有关更多信息,请参阅SPARK-5063.

(2)当Spark Streaming作业从检查点恢复时,如果在DStream操作中使用了对未由流作业定义的RDD的引用,则会发生此异常.有关更多信息,请参见SPARK-13​​758.


It could happen in the following cases:

(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations;
for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.

推荐答案

首先,我强烈建议使用cache运算符缓存第一个RDD.

First of all, I'd highly recommend caching the first RDD using cache operator.

RSet.cache

这将避免每次您filter查找其他RDD:RpRcRpSvRcSv时都扫描和转换数据集.

That will avoid scanning and transforming your dataset every time you filter for the other RDDs: Rp, Rc, RpSv and RcSv.

引用缓存:

cache()使用默认存储级别(MEMORY_ONLY)保留此RDD.

cache() Persist this RDD with the default storage level (MEMORY_ONLY).

性能应该提高.

第二,我会非常小心地使用术语分区"来指代经过过滤的RDD,因为该术语在Spark中具有特殊含义.

Secondly, I'd be very careful using the term "partition" to refer to a filtered RDD since the term has a special meaning in Spark.

分区说明Spark为一个动作执行了多少个任务.这些是Spark的提示,因此您(Spark开发人员)可以微调您的分布式管道.

Partitions say how many tasks Spark executes for an action. They are hints for Spark so you, a Spark developer, could fine-tune your distributed pipeline.

根据分区方案,流水线分布在群集节点之间,具有一个或多个Spark执行程序.如果您决定在RDD中有一个分区,则在该​​RDD上执行一个操作后,您将在一个执行器上只有一个任务.

The pipeline is distributed across cluster nodes with one or many Spark executors per the partitioning scheme. If you decide to have a one partition in a RDD, once you execute an action on that RDD, you'll have one task on one executor.

filter转换不会更改分区数(换句话说,它保留分区).分区数(即任务数)恰好是RSet的分区数.

The filter transformation does not change the number of partitions (in other words, it preserves partitioning). The number of partitions, i.e. the number of tasks, is exactly the number of partitions of RSet.

1-当我在RxSv上循环操作时,它会收集分区的所有项目,但我只想我所在的分区

1- When I loop operation on RxSv it collects all items of the partitions, but I want to only a partition where i am in

你是.不必担心,因为Spark将在数据所在的执行器上执行任务. foreach是一种操作,它不会收集项目,而是描述一种在执行程序上运行的计算,其中数据分布在整个群集(作为分区)上.

You are. Don't worry about it as Spark will execute the task on executors where the data lives. foreach is an action that does not collect items but describes a computation that runs on executors with the data distributed across the cluster (as partitions).

如果要一次处理每个分区中的所有项目,请使用

If you want to process all items at once per partition use foreachPartition:

foreachPartition 将函数f应用于此RDD的每个分区.

foreachPartition Applies a function f to each partition of this RDD.


2-我在此代码上遇到了问题

2- I have encountered with a problem on this code

在代码的以下行中:

    RxSv.foreach{item => {
           val string = item._1.split(",")
           val t = Rx.filter(x => x._1.split(",")(2).equals(string(2)))

您正在执行foreach动作,而该动作又使用了Rx(即RDD[(String, Int)]).不允许这样做(如果可能的话,不应该对其进行编译).

you are executing foreach action that in turn uses Rx which is RDD[(String, Int)]. This is not allowed (and if it were possible should not have been compiled).

该行为的原因是RDD是一种数据结构,仅描述执行动作并驻留在驱动程序(协调器)上时数据集所发生的情况.驱动程序使用数据结构来跟踪数据源,转换和分区数.

The reason for the behaviour is that an RDD is a data structure that just describes what happens with the dataset when an action is executed and lives on the driver (the orchestrator). The driver uses the data structure to track the data sources, transformations and the number of partitions.

当驱动程序在执行程序上生成任务时,作为实体的RDD消失(=消失).

A RDD as an entity is gone (= disappears) when the driver spawns tasks on executors.

当任务运行时,没有任何东西可以帮助他们知道如何运行属于他们工作的RDD.因此,错误. Spark对此非常谨慎,并在执行任务后检查此类异常,以免引起问题.

And when the tasks run nothing is available to help them to know how to run RDDs that are part of their work. And hence the error. Spark is very cautious about it and checks such anomalies before they could cause issues after tasks are executed.

这篇关于为什么RDD.foreach失败并显示"SparkException:此RDD缺少SparkContext"?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆