rdd相关内容

如何使用PYSPARK从Spark获得批次行

我有一个包含60多亿行数据的Spark RDD,我想使用Train_on_Batch来训练深度学习模型。我不能将所有行都放入内存中,所以我希望一次获得10K左右的内存,以批处理成64或128个的块(取决于型号大小)。我目前使用的是rdd.Sample(),但我认为这不能保证我会得到所有行。有没有更好的方法来划分数据,使其更易于管理,这样我就可以编写一个生成器函数来获取批处理?我的代码如下: ..
发布时间:2022-07-15 23:08:08 Python

火花RDD不变性混淆

我目前正在为数据工程师的工作面试做准备。我陷入了困惑之中。 以下是详细信息。 如果Spark RDDS本质上是不可变的,那么为什么我们能够使用var创建Spark RDD? 推荐答案 您的困惑与Spark的RDDS没有什么关系。这将有助于理解变量和对象之间的区别。一个更熟悉的例子: 假设您有一个字符串,我们都知道它是一个不可变类型: var text = "abc" ..
发布时间:2022-04-13 20:06:53 其他开发

计算文件哈希/校验和的代码不起作用

我有以下pyspark代码来计算文件夹中每个文件的SHA1散列。我使用spark.sparkContext.binaryFiles来获取RDD对,其中键是文件名,值是一个类似文件的对象,我正在计算映射函数rdd.mapValues(map_hash_file)中的散列。然而,我在倒数第二行收到了下面的错误,我不明白--请问如何解决这个问题?谢谢 错误: org.apache.spark.Sp ..
发布时间:2022-04-13 20:03:55 其他开发

PYSpark显示最大值(S)和多重排序

感谢您在这里提供的帮助。使用Pyspark(请不能使用SQL)。因此,我有一个存储为RDD对的元组列表: [((‘City1’,‘2020-03-27’,‘X1’),44), (‘City1’,‘2020-03-28’,‘X1’),44), (‘City3’,‘2020-03-28’,‘X3’),15), ((‘City4’,‘2020-03-27’,‘X4’),5), ..
发布时间:2022-04-13 20:02:43 Python

如何在PySpark中读取大型JSON数组文件

问题 我最近在Azure Data Lake Analytics遇到了一个挑战,当时我试图读入一个大型的UTF-8 JSON数组文件,并切换到HDInsight PySpark(v2.x,而不是3)来处理该文件。该文件大小约为110G,具有约150M个JSON对象。 HDInsight PySpark似乎不支持数组的JSON文件格式的输入,所以我被卡住了。另外,我还有“许多”这样的文件 ..
发布时间:2022-04-13 20:00:52 其他开发

Spark:测试RDD是否为空的有效方法

RDD上没有isEmpty方法,那么测试RDD是否为空的最有效方法是什么? 推荐答案 RDD.isEmpty()将成为Spark 1.3.0的一部分。 根据this apache mail-thread中的建议和后来对这个答案的一些评论,我做了一些小的本地实验。最好的方法是使用take(1).length==0。 def isEmpty[T](rdd : RDD[T]) = ..
发布时间:2022-04-13 19:56:51 其他开发

';收集';在Spark独立应用程序的驱动程序窗口中未显示操作结果

我在本地系统上使用的是Spark 1.4.0。每当我创建一个RDD并通过Spark的Scala外壳对其调用Collect时,它都工作得很好。但是,当我创建一个独立的应用程序并在RDD上调用‘Collect’操作时,我看不到结果,尽管运行期间的Spark消息说已经为驱动程序设置了一定数量的字节:- INFO Executor: Finished task 0.0 in stage 0.0 (T ..
发布时间:2022-04-13 19:54:02 其他开发

比较两个 RDD

我有两个 RDD[Array[String]],我们称它们为 rdd1 和 rdd2.我将创建一个新的 RDD,其中仅包含 rdd2 的条目,而不是 rdd1(基于键).我通过 Intellij 在 Scala 上使用 Spark. 我用一个键将rdd1和rdd2分组(我将只比较两个rdds的键): val rdd1Grouped = rdd1.groupBy(line => line(0 ..
发布时间:2022-01-25 09:28:23 其他开发

如何在 Spark 中显示 KeyValueGroupedDataset?

我正在尝试学习 Spark 中的数据集.我想不通的一件事是如何显示 KeyValueGroupedDataset,因为 show 对它不起作用.另外,KeyValuGroupedDataSet 的 map 等价物是什么?如果有人举一些例子,我将不胜感激. 解决方案 好的,我从给出的示例中得到了这个想法 这里 和 这里.我在下面给出一个我写的简单例子. val x = Seq(("a", ..
发布时间:2022-01-21 13:06:00 其他开发

为什么 Spark 将 Map 阶段输出保存到本地磁盘?

我正在尝试深入了解 spark shuffle 过程.当我开始阅读时,我遇到了以下几点. Spark 在完成时将 Map 任务 (ShuffleMapTask) 输出直接写入磁盘. 我想了解以下关于 Hadoop MapReduce 的内容. 如果 Map-Reduce 和 Spark 都将数据写入本地磁盘,那么 spark shuffle 过程与 Hadoop MapReduc ..
发布时间:2022-01-13 23:35:01 其他开发

是否可以在 Apache Spark 中创建嵌套的 RDD?

我正在尝试在 Spark 中实现 K 最近邻算法.我想知道是否可以使用嵌套的 RDD.这会让我的生活轻松很多.考虑以下代码片段. public static void main (String[] args){//等等等等代码JavaRDDtemp1 = testData.map(new Function(){公共双重调用(最终向量 z)抛出异常{JavaRDDt ..
发布时间:2022-01-07 13:16:16 Java开发

Spark:如何按时间范围加入 RDD

我有一个微妙的 Spark 问题,我就是无法解决这个问题. 我们有两个 RDD(来自 Cassandra).RDD1 包含Actions,RDD2 包含Historic 数据.两者都有一个可以匹配/加入的 ID.但问题是这两个表有 N:N 关系.Actions 包含多个具有相同 id 的行,Historic 也是如此.以下是两个表中的一些示例日期. Actions 时间实际上是一个时间 ..
发布时间:2021-12-31 18:12:17 其他开发

Spark JSON 文本字段到 RDD

我有一个 cassandra 表,其中有一个名为 snapshot 的文本字段,其中包含 JSON 对象: [标识符、时间戳、快照] 我知道为了能够使用 Spark 对该字段进行转换,我需要将该 RDD 的该字段转换为另一个 RDD 以对 JSON 模式进行转换. 这样对吗?我应该如何处理? 编辑:现在我设法从单个文本字段创建一个 RDD: val conf = new Spar ..
发布时间:2021-12-31 18:11:17 其他开发

了解 Spark 中的 treeReduce()

您可以在此处查看实现:https://github.com/apache/spark/blob/ffa05c84fe75663fc33f3d954d1cb1e084ab3280/python/pyspark/rdd.py#L804 它与“普通"reduce 函数有何不同? depth = 2 是什么意思? 我不希望 reducer 函数在分区上线性传递,但首先减少每个可用的对,然后将 ..
发布时间:2021-12-22 21:42:01 Python

获取 Spark RDD 中每个键的最大值

返回与 spark RDD 中每个唯一键关联的最大行(值)的最佳方法是什么? 我正在使用 python 并且我尝试过 Math max,通过键和聚合进行映射和减少.有没有一种有效的方法来做到这一点?可能是 UDF? 我有 RDD 格式: [(v, 3),(v, 1),(v, 1),(w, 7),(w, 1),(x, 3),(y, 1),(y, 1),(y, 2),(y, 3)] 我 ..
发布时间:2021-12-22 21:35:48 Python