rdd相关内容
我知道使用 HashPartitioner 根据键值对 RDD 进行分区.但是 Spark Dataframe 默认是如何分区的,因为它没有 key/value 的概念. 解决方案 Dataframe 的分区取决于运行以创建它的任务数量. 没有“默认"应用了分区逻辑.以下是如何设置分区的一些示例: 通过 val df = Seq(1 to 500000: _*).toDF()
..
我想动态生成一个包含报告标题记录的数据框,因此根据以下字符串的值创建一个数据框: val headerDescs : String = "Name,Age,Location"val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
..
我是 SPARK 的新手,正在想出更好的方法来实现以下场景.有一个包含 3 个字段的数据库表 - 类别、数量、数量.首先,我尝试从数据库中提取所有不同的类别. val 类别:RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString) 现在对于每个类别,我想执行流水线,它基本上从每个类别创建数据帧并应用一些机
..
我正在应用一个函数,比如说 f(),通过 map 方法到 DataFrame 的行(称之为 df),但是当我在结果 RDD 上调用 collect 时我看到 NullPointerException 如果 df.columns 作为参数传递给 f(). 以下 Scala 代码可以粘贴到 spark-shell 中,显示了该问题的一个最小示例(请参阅函数 prepRDD_buggy()).我还
..
这个问题是关于聚合操作时DataFrame 和RDD 之间的二元性.在 Spark SQL 中,可以使用表生成 UDF 进行自定义聚合,但创建其中之一通常明显不如使用可用于 RDD 的聚合函数对用户友好,尤其是在不需要表输出的情况下. 是否有一种有效的方法可以将诸如 aggregateByKey 之类的配对 RDD 操作应用于使用 GROUP BY 分组或使用 ORDERED BY 排序的
..
我正在阅读高性能 Spark,作者提出以下声明: 虽然 Catalyst 优化器非常强大,但它目前遇到的挑战之一是非常大的查询计划.这些查询计划往往是迭代算法的结果,例如图算法或机器学习算法.一个简单的解决方法是在每次迭代结束时将数据转换为 RDD 并返回到 DataFrame/Dataset,如例 3-58 所示. 示例 3-58 被标记为“Round trip through RD
..
如何覆盖当我们节省时间时,RDD 输出对象任何现有路径. 测试 1: 975078|56691|2.000|20171001_926_570_1322975078|42993|1.690|20171001_926_570_1322975078|46462|2.000|20171001_926_570_1322975078|87815|1.000|20171001_926_570_1322r
..
我正在尝试将数据帧转换为 RDD,然后执行下面的一些操作以返回元组: df.rdd.map { t=>(t._2 + "_" + t._3 , t)}.take(5) 然后我得到了下面的错误.谁有想法?谢谢! :37: 错误:值 _2 不是 org.apache.spark.sql.Row 的成员(t._2 + "_" + t._3 , t)^ 解决方案 当你将 DataFrame 转换
..
我有两个 RDD 说 rdd1 =身份证 |创建 |摧毁|价钱1 |1 |2 |102 |1 |5 |113 |2 |3 |114 |3 |4 |125 |3 |5 |11rdd2 =[1,2,3,4,5] # 我们将这些值称为时间戳 (ts) rdd2 基本上是使用 range(intial_value, end_value, interval) 生成的.这里的参数可能会有所不同.大小可以
..
我正在尝试为我的新 DataFrame 创建一个架构,并尝试了各种括号和关键字的组合,但一直无法弄清楚如何进行这项工作.我目前的尝试: from pyspark.sql.types import *架构 = 结构类型([StructField("用户", IntegerType()),数组类型(结构类型([StructField("user", StringType()),StructField
..
任何人都可以纠正我对 Spark 坚持的理解. 如果我们在 RDD 上执行了 cache(),它的值只会缓存在那些最初计算 RDD 的节点上.意思是,如果有一个由 100 个节点组成的集群,并且 RDD 是在第一个和第二个节点的分区中计算的.如果我们缓存了这个 RDD,那么 Spark 将只在第一个或第二个工作节点中缓存它的值.所以当这个 Spark 应用程序在后期尝试使用这个 RDD 时,
..
我在 HDFS 上有一个文本文件,我想将其转换为 Spark 中的数据帧. 我正在使用 Spark 上下文加载文件,然后尝试从该文件生成各个列. val myFile = sc.textFile("file.txt")val myFile1 = myFile.map(x=>x.split(";")) 这样做后,我正在尝试以下操作. myFile1.toDF() 我遇到了一个问题,因为
..
我想将包含字符串记录的 RDD 转换为 Spark 数据帧,如下所示. "Mike,2222-003330,NY,34"“凯特,3333-544444,洛杉矶,32"“艾比,4444-234324,MA,56".... 模式行不在同一个 RDD 中,而是在另一个变量中: val header = "name,account,state,age" 所以现在我的问题是,如何使用上述两个,在 S
..
当前的 Pyspark 数据帧具有以下结构(col2 的 WrappedArrays 列表): +---+--------------------------------------------------------------------+|id |col2 |+---+----------------------------------------------------+|a |[Wra
..
我对 Spark 和 Scala 比较陌生. 我从以下数据框开始(由密集的双精度向量组成的单列): scala>val scaledDataOnly_pruned = scaledDataOnly.select("features")scaledDataOnly_pruned: org.apache.spark.sql.DataFrame = [features: vector]标度>sc
..
我正在加载一个 JSON 对象文件作为 PySpark SchemaRDD.我想改变对象的“形状"(基本上,我将它们展平),然后插入到 Hive 表中. 我遇到的问题是以下返回一个 PipelinedRDD 而不是 SchemaRDD: log_json.map(flatten_function) (其中 log_json 是 SchemaRDD). 有没有办法保留类型,转换回所需
..
我在名为 end_stats_df 的 pyspark 数据框中有以下数据: values start end cat1 cat210 1 2 A B11 1 2 C B12 1 2 分贝510 1 2 直流550 1 2 C B500 1 2 A B80 1 3 A B 我想用以下方式聚合它: 我想使用“开始"和“结束"列作为聚合键 对于每组行,我需要执行以下操作: 计算该组的 c
..
Spark 文档 展示了如何从 RDD 创建 DataFrame,使用 Scala 案例类来推断模式.我正在尝试使用 sqlContext.createDataFrame(RDD, CaseClass) 重现这个概念,但我的 DataFrame 最终为空.这是我的 Scala 代码: //sc 是 SparkContext,而 sqlContext 是 SQLContext.//定义案例类和原始
..
一个函数应该对数据框中的多列执行 def handleBias(df: DataFrame, colName: String, target: String = target) = {val w1 = Window.partitionBy(colName)val w2 = Window.partitionBy(colName, target)df.withColumn("cnt_group",
..
我正在创建一个新的 DataFrame,其中包含来自 Join 的少量记录. valjoined_df = first_df.join(second_df, first_df.col("key") ===second_df.col("key") &&second_df.col("key").isNull, "left_outer")join_df.repartition(1)join_df.ca
..