rdd相关内容

默认情况下,Spark Dataframe 是如何分区的?

我知道使用 HashPartitioner 根据键值对 RDD 进行分区.但是 Spark Dataframe 默认是如何分区的,因为它没有 key/value 的概念. 解决方案 Dataframe 的分区取决于运行以创建它的任务数量. 没有“默认"应用了分区逻辑.以下是如何设置分区的一些示例: 通过 val df = Seq(1 to 500000: _*).toDF() ..
发布时间:2021-11-14 22:47:19 其他开发

SPARK - 使用 RDD.foreach 创建数据帧并在数据帧上执行操作

我是 SPARK 的新手,正在想出更好的方法来实现以下场景.有一个包含 3 个字段的数据库表 - 类别、数量、数量.首先,我尝试从数据库中提取所有不同的类别. val 类别:RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString) 现在对于每个类别,我想执行流水线,它基本上从每个类别创建数据帧并应用一些机 ..
发布时间:2021-11-14 22:43:39 其他开发

Spark DataFrame 列名称未传递给从属节点?

我正在应用一个函数,比如说 f(),通过 map 方法到 DataFrame 的行(称之为 df),但是当我在结果 RDD 上调用 collect 时我看到 NullPointerException 如果 df.columns 作为参数传递给 f(). 以下 Scala 代码可以粘贴到 spark-shell 中,显示了该问题的一个最小示例(请参阅函数 prepRDD_buggy()).我还 ..
发布时间:2021-11-14 22:42:58 其他开发

使用 Spark SQL GROUP BY 对 DataFrame 进行高效的 PairRDD 操作

这个问题是关于聚合操作时DataFrame 和RDD 之间的二元性.在 Spark SQL 中,可以使用表生成 UDF 进行自定义聚合,但创建其中之一通常明显不如使用可用于 RDD 的聚合函数对用户友好,尤其是在不需要表输出的情况下. 是否有一种有效的方法可以将诸如 aggregateByKey 之类的配对 RDD 操作应用于使用 GROUP BY 分组或使用 ORDERED BY 排序的 ..
发布时间:2021-11-14 22:42:48 其他开发

对于“迭代算法",转换为 RDD 然后再转换回 Dataframe 有什么好处

我正在阅读高性能 Spark,作者提出以下声明: 虽然 Catalyst 优化器非常强大,但它目前遇到的挑战之一是非常大的查询计划.这些查询计划往往是迭代算法的结果,例如图算法或机器学习算法.一个简单的解决方法是在每次迭代结束时将数据转换为 RDD 并返回到 DataFrame/Dataset,如例 3-58 所示. 示例 3-58 被标记为“Round trip through RD ..
发布时间:2021-11-14 22:41:32 其他开发

Spark - 嵌套的 RDD 操作

我有两个 RDD 说 rdd1 =身份证 |创建 |摧毁|价钱1 |1 |2 |102 |1 |5 |113 |2 |3 |114 |3 |4 |125 |3 |5 |11rdd2 =[1,2,3,4,5] # 我们将这些值称为时间戳 (ts) rdd2 基本上是使用 range(intial_value, end_value, interval) 生成的.这里的参数可能会有所不同.大小可以 ..
发布时间:2021-11-14 22:26:26 Python

创建一个涉及 ArrayType 的 Pyspark Schema

我正在尝试为我的新 DataFrame 创建一个架构,并尝试了各种括号和关键字的组合,但一直无法弄清楚如何进行这项工作.我目前的尝试: from pyspark.sql.types import *架构 = 结构类型([StructField("用户", IntegerType()),数组类型(结构类型([StructField("user", StringType()),StructField ..
发布时间:2021-11-14 22:18:22 其他开发

Spark RDD 是否缓存在工作节点或驱动程序节点(或两者)上?

任何人都可以纠正我对 Spark 坚持的理解. 如果我们在 RDD 上执行了 cache(),它的值只会缓存在那些最初计算 RDD 的节点上.意思是,如果有一个由 100 个节点组成的集群,并且 RDD 是在第一个和第二个节点的分区中计算的.如果我们缓存了这个 RDD,那么 Spark 将只在第一个或第二个工作节点中缓存它的值.所以当这个 Spark 应用程序在后期尝试使用这个 RDD 时, ..
发布时间:2021-11-14 22:16:46 其他开发

如何使用 Scala 将 csv 字符串解析为 Spark 数据帧?

我想将包含字符串记录的 RDD 转换为 Spark 数据帧,如下所示. "Mike,2222-003330,NY,34"“凯特,3333-544444,洛杉矶,32"“艾比,4444-234324,MA,56".... 模式行不在同一个 RDD 中,而是在另一个变量中: val header = "name,account,state,age" 所以现在我的问题是,如何使用上述两个,在 S ..
发布时间:2021-11-14 22:12:30 其他开发

PySpark:将 SchemaRDD 映射到 SchemaRDD

我正在加载一个 JSON 对象文件作为 PySpark SchemaRDD.我想改变对象的“形状"(基本上,我将它们展平),然后插入到 Hive 表中. 我遇到的问题是以下返回一个 PipelinedRDD 而不是 SchemaRDD: log_json.map(flatten_function) (其中 log_json 是 SchemaRDD). 有没有办法保留类型,转换回所需 ..
发布时间:2021-11-14 22:01:07 其他开发

如何在多列上编写 Pyspark UDAF?

我在名为 end_stats_df 的 pyspark 数据框中有以下数据: values start end cat1 cat210 1 2 A B11 1 2 C B12 1 2 分贝510 1 2 直流550 1 2 C B500 1 2 A B80 1 3 A B 我想用以下方式聚合它: 我想使用“开始"和“结束"列作为聚合键 对于每组行,我需要执行以下操作: 计算该组的 c ..
发布时间:2021-11-14 22:00:23 其他开发