rdd相关内容
我有一个包含60多亿行数据的Spark RDD,我想使用Train_on_Batch来训练深度学习模型。我不能将所有行都放入内存中,所以我希望一次获得10K左右的内存,以批处理成64或128个的块(取决于型号大小)。我目前使用的是rdd.Sample(),但我认为这不能保证我会得到所有行。有没有更好的方法来划分数据,使其更易于管理,这样我就可以编写一个生成器函数来获取批处理?我的代码如下:
..
list_1 = [[6, [3, 8, 7]], [5, [9, 7, 3]], [6, [7, 8, 5]], [5, [6, 7, 2]]] rdd1 = sc.parallelize(list_1) newpairRDD = rdd1.partitionBy(2,lambda k: int(k[0])) print("Partitions structure: {}".format(ne
..
我目前正在为数据工程师的工作面试做准备。我陷入了困惑之中。 以下是详细信息。 如果Spark RDDS本质上是不可变的,那么为什么我们能够使用var创建Spark RDD? 推荐答案 您的困惑与Spark的RDDS没有什么关系。这将有助于理解变量和对象之间的区别。一个更熟悉的例子: 假设您有一个字符串,我们都知道它是一个不可变类型: var text = "abc"
..
我有以下pyspark代码来计算文件夹中每个文件的SHA1散列。我使用spark.sparkContext.binaryFiles来获取RDD对,其中键是文件名,值是一个类似文件的对象,我正在计算映射函数rdd.mapValues(map_hash_file)中的散列。然而,我在倒数第二行收到了下面的错误,我不明白--请问如何解决这个问题?谢谢 错误: org.apache.spark.Sp
..
感谢您在这里提供的帮助。使用Pyspark(请不能使用SQL)。因此,我有一个存储为RDD对的元组列表: [((‘City1’,‘2020-03-27’,‘X1’),44), (‘City1’,‘2020-03-28’,‘X1’),44), (‘City3’,‘2020-03-28’,‘X3’),15), ((‘City4’,‘2020-03-27’,‘X4’),5),
..
问题 我最近在Azure Data Lake Analytics遇到了一个挑战,当时我试图读入一个大型的UTF-8 JSON数组文件,并切换到HDInsight PySpark(v2.x,而不是3)来处理该文件。该文件大小约为110G,具有约150M个JSON对象。 HDInsight PySpark似乎不支持数组的JSON文件格式的输入,所以我被卡住了。另外,我还有“许多”这样的文件
..
创建表格- CREATE TABLE test.word_groups (group text, word text, count int,PRIMARY KEY (group,word)); 插入数据- INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raj', 0) ; INSE
..
RDD上没有isEmpty方法,那么测试RDD是否为空的最有效方法是什么? 推荐答案 RDD.isEmpty()将成为Spark 1.3.0的一部分。 根据this apache mail-thread中的建议和后来对这个答案的一些评论,我做了一些小的本地实验。最好的方法是使用take(1).length==0。 def isEmpty[T](rdd : RDD[T]) =
..
我在本地系统上使用的是Spark 1.4.0。每当我创建一个RDD并通过Spark的Scala外壳对其调用Collect时,它都工作得很好。但是,当我创建一个独立的应用程序并在RDD上调用‘Collect’操作时,我看不到结果,尽管运行期间的Spark消息说已经为驱动程序设置了一定数量的字节:- INFO Executor: Finished task 0.0 in stage 0.0 (T
..
我有两个 RDD[Array[String]],我们称它们为 rdd1 和 rdd2.我将创建一个新的 RDD,其中仅包含 rdd2 的条目,而不是 rdd1(基于键).我通过 Intellij 在 Scala 上使用 Spark. 我用一个键将rdd1和rdd2分组(我将只比较两个rdds的键): val rdd1Grouped = rdd1.groupBy(line => line(0
..
我正在尝试学习 Spark 中的数据集.我想不通的一件事是如何显示 KeyValueGroupedDataset,因为 show 对它不起作用.另外,KeyValuGroupedDataSet 的 map 等价物是什么?如果有人举一些例子,我将不胜感激. 解决方案 好的,我从给出的示例中得到了这个想法 这里 和 这里.我在下面给出一个我写的简单例子. val x = Seq(("a",
..
我正在尝试深入了解 spark shuffle 过程.当我开始阅读时,我遇到了以下几点. Spark 在完成时将 Map 任务 (ShuffleMapTask) 输出直接写入磁盘. 我想了解以下关于 Hadoop MapReduce 的内容. 如果 Map-Reduce 和 Spark 都将数据写入本地磁盘,那么 spark shuffle 过程与 Hadoop MapReduc
..
我正在编写一个 Spark 应用程序,并希望将一组键值对 (K, V1), (K, V2), ..., (K, Vn) 组合成一个键-多值对(K, [V1, V2, ..., Vn]).我觉得我应该能够使用具有某种风味的 reduceByKey 函数来做到这一点: My_KMV = My_KV.reduce(lambda a, b: a.append([b])) 发生这种情况时我得到的错误是:
..
我正在尝试在 Spark 中实现 K 最近邻算法.我想知道是否可以使用嵌套的 RDD.这会让我的生活轻松很多.考虑以下代码片段. public static void main (String[] args){//等等等等代码JavaRDDtemp1 = testData.map(new Function(){公共双重调用(最终向量 z)抛出异常{JavaRDDt
..
我有一个微妙的 Spark 问题,我就是无法解决这个问题. 我们有两个 RDD(来自 Cassandra).RDD1 包含Actions,RDD2 包含Historic 数据.两者都有一个可以匹配/加入的 ID.但问题是这两个表有 N:N 关系.Actions 包含多个具有相同 id 的行,Historic 也是如此.以下是两个表中的一些示例日期. Actions 时间实际上是一个时间
..
我有一个 cassandra 表,其中有一个名为 snapshot 的文本字段,其中包含 JSON 对象: [标识符、时间戳、快照] 我知道为了能够使用 Spark 对该字段进行转换,我需要将该 RDD 的该字段转换为另一个 RDD 以对 JSON 模式进行转换. 这样对吗?我应该如何处理? 编辑:现在我设法从单个文本字段创建一个 RDD: val conf = new Spar
..
如何在连接两个数据框时给出更多的列条件.例如我想运行以下: val Lead_all = Leads.join(Utm_Master,Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==Utm_Master.columns("LeadSource","Utm_Source","Utm_Mediu
..
您可以在此处查看实现:https://github.com/apache/spark/blob/ffa05c84fe75663fc33f3d954d1cb1e084ab3280/python/pyspark/rdd.py#L804 它与“普通"reduce 函数有何不同? depth = 2 是什么意思? 我不希望 reducer 函数在分区上线性传递,但首先减少每个可用的对,然后将
..
我有一个 RDD,我想将它转换为 pandas dataframe.我知道将 RDD 转换为普通的 dataframe 我们可以做 df = rdd1.toDF() 但我想将 RDD 转换为 pandas dataframe 而不是普通的 dataframe.我该怎么做? 解决方案 您可以使用函数 toPandas(): 将此 DataFrame 的内容作为 Pandas pan
..
返回与 spark RDD 中每个唯一键关联的最大行(值)的最佳方法是什么? 我正在使用 python 并且我尝试过 Math max,通过键和聚合进行映射和减少.有没有一种有效的方法来做到这一点?可能是 UDF? 我有 RDD 格式: [(v, 3),(v, 1),(v, 1),(w, 7),(w, 1),(x, 3),(y, 1),(y, 1),(y, 2),(y, 3)] 我
..