spark-dataframe相关内容
我正在处理 spark 数据框,我需要对列进行分组,并将分组行的列值转换为元素数组作为新列.示例: 输入:员工 |地址------------------迈克尔 |纽约迈克尔 |新泽西州输出:员工 |地址------------------迈克尔 |(纽约,新泽西) 非常感谢任何帮助. 解决方案 这里有一个替代解决方案我已将数据帧转换为用于转换的 rdd,并使用 sqlContext.
..
我正在尝试在 spark (pyspark) 环境中使用 JSON 文件. 问题:无法在 Pyspark Dataframe 中将 JSON 转换为预期格式 第一个输入数据集: https://health.data.ny.gov/api/views/cnih-y5dw/rows.json 在这个文件中,元数据是在文件的开头定义的,带有“meta"标签.然后是带有“data
..
我有带有树结构的层次结构数据模型的数据表.例如:这是一个示例数据行: -------------------------------------------身份证 |姓名 |parentId |路径 |深度-------------------------------------55 |加拿大 |空|空|077 |安大略 |55 |/55 |1100|多伦多 |77 |/55/77 |2104|
..
背景 我有一个包含 (我认为是) 对 (String, String) 的数据框. 看起来像这样: >df.show|列 1 |Col2 ||一个 |[k1, v1]||一个 |[k2, v2]|>df.printSchema|-- _1: 字符串(可为空 = 真)|-- _2: struct (nullable = true)||-- _1: 字符串(可为空 = 真)||-- _2
..
我在 Web 服务器上有一个 .gz 文件,我想以流式方式使用该文件并将数据插入 Couchbase..gz 文件中只有一个文件,每行包含一个 JSON 对象. 由于 Spark 没有 HTTP 接收器,所以我自己写了一个(如下所示).我正在使用 Couchbase Spark 连接器 进行插入.但是,在运行时,该作业实际上并未插入任何内容.我怀疑这是由于我对 Spark 缺乏经验并且不知道
..
我用的是spark1.6.我尝试广播 RDD,但不确定如何访问数据帧中的广播变量? 我有两个数据框员工 &部门. 员工数据框 -------------------员工 ID |员工姓名 |Emp_Age------------------1 |约翰 |252 |大卫 |35 部门数据框 --------------------部门 ID |部门名称 |员工编号---------
..
写入分区文本文件之类的简单操作失败. dataDF.write.partitionBy("year", "month", "date").mode(SaveMode.Append).text("s3://data/test2/events/") 异常 - 16/07/06 02:15:05 错误 datasources.DynamicPartitionWriterContainer:中止任务
..
我需要将多个 csv 文件组合成一个对象(我假设是一个数据框),但它们都有不匹配的列,如下所示: CSV A store_location_key |product_key |收藏家_key |trans_dt |销售 |单位|转键 CSV B collector_key |trans_dt |store_location_key |product_key |销售 |单位|转键 CS
..
我有 10K 列和 7000 万行的 DF.我想计算 10K 列的均值和 corr.我做了下面的代码,但由于代码大小 64K 问题(https://issues.apache.org/jira/browse/SPARK-16845) 数据: region dept week sal val1 val2 val3 ... val10000美国 CS 1 1 2 1 1 ... 2美国 CS
..
我正在运行一个 Spark 作业,就逻辑而言,它的性能非常好.但是,当我使用 saveAsTextFile 将文件保存在 s3 存储桶中时,我的输出文件的名称采用 part-00000、part-00001 等格式.有没有办法改变输出文件名? 谢谢. 解决方案 在 Spark 中,您可以使用 saveAsNewAPIHadoopFile 并将 hadoop 配置中的 mapred
..
我有一个如下所示的数据框. itemName, itemCategory姓名 1, C0名称2,C1名称3,C0 我想将此数据框保存为分区镶木地板文件: df.write.mode("overwrite").partitionBy("itemCategory").parquet(path) 对于这个数据框,当我读回数据时,它会有itemCategory的数据类型字符串. 但是有时,我
..
我有两个数据框,我正在 5 列上执行外连接.以下是我的数据集示例. uniqueFundamentalSet|^|PeriodId|^|SourceId|^|StatementTypeCode|^|StatementCurrencyId|^|FinancialStatementLineItem.lineItemId|^|FinancialAsReportedLineItemName|^|Fina
..
我的输入数据集大约 150G.我正在设置 --conf spark.cores.max=100--conf spark.executor.instances=20--conf spark.executor.memory=8G--conf spark.executor.cores=5--conf spark.driver.memory=4G 但由于数据在执行者之间分布不均,我不断收到 容器因超
..
我正在创建一个 spark 作业,该作业需要使用 Python 编写的函数将一列添加到数据帧中.其余的处理使用 Scala 完成. 我找到了如何从 pyspark 调用 Java/Scala 函数的示例: https://community.hortonworks.com/questions/110844/is-it-possible-to-call-a-scala-function-
..
我有两个数据框,df1 有 2200 万条记录,df2 有 200 万条记录.我正在将 email_address 作为键进行正确连接. test_join = df2.join(df1, "email_address", how = 'right').cache() 两个数据框中的重复电子邮件(如果有)很少.加入后,我试图找到结果数据帧 test_join 的分区大小,使用以下代码: l
..
SparkContext 类中的 getOrCreate 方法的目的是什么?我不明白我们什么时候应该使用这种方法. 如果我有 2 个使用 spark-submit 运行的 spark 应用程序,并且在主方法中我使用 SparkContext.getOrCreate 实例化了 spark 上下文,则两个应用程序都将具有相同的上下文? 或者目的更简单,唯一的目的是当我创建一个spark应用
..
我在 pyspark 中有一个数据框.假设有一些列 a,b,c...我想随着列值的变化将数据分组.说 A B1 次1 年0 x0 年0 x1 年1 次1 年 将有 3 个组为 (1x,1y),(0x,0y,0x),(1y,1x,1y)以及对应的行数据 解决方案 如果我理解正确,您希望在 A 列每次更改值时创建一个不同的组. 首先,我们将创建一个单调递增的 id 以保持行序不变:
..
TL;DR:当我将 Spark DataFrame 转储为 json 时,我总是得到类似 {"key1": "v11", "key2": "v21"}{“key1":“v12",“key2":“v22"}{“key1":“v13",“key2":“v23"} 这是无效的 json.我可以手动编辑转储的文件以获得我可以解析的内容: [{“key1":“v11",“key2":“v21"},{“
..
我正在尝试使用新的 spark 2.1 csv 选项将 DataFrame 保存到 CSV 中 df.select(myColumns: _*).write.mode(SaveMode.Overwrite).option("header", "true").option("codec", "org.apache.hadoop.io.compress.GzipCodec").csv(绝对路径)
..
我需要为我的 spark 应用程序从 postgres 数据库加载/删除特定记录.对于加载,我使用以下格式的火花数据帧 sqlContext.read.format("jdbc").options(Map("url" -> "postgres url",“用户"->"用户" ,“密码"->"xxxxxx" ,“表"->"(select * from employee where emp_id >
..