spark-dataframe相关内容
我尝试在 spark 1.4.1 中的 spark-shell 中使用 spark -csv 包处理 CSV 文件. scala>导入 org.apache.spark.sql.hive.HiveContext导入 org.apache.spark.sql.hive.HiveContext标度>导入 org.apache.spark.sql.hive.orc._导入 org.apache.spa
..
我有两个数据集名称 dataset1 和 dataset2 和 dataset1 就像 empid empame101 约翰102 凯文 和dataset2就像 empid empmarks empaddress101 75 洛杉矶102 69 纽约 dataset2 将非常庞大,我需要对这两个数据集进行一些操作,并需要从以上两个 dataset 中获取结果.据我所知,现在我有两种选择来处
..
当我在所有任务成功后将数据帧中的数据写入镶木地板表(已分区)时,进程卡在更新分区统计信息上. 16/10/05 03:46:13 WARN 日志:快速更新分区统计信息:16/10/05 03:46:14 警告日志:更新大小为 14345257616/10/05 03:48:30 警告日志:快速更新分区统计信息:16/10/05 03:48:31 警告日志:大小更新为 14738281316/10
..
我想弄清楚如何根据另一行更新某些行. 例如,我有一些类似的数据 Id |用户名 |评分 |城市--------------------------------1、飞利浦、2.0、蒙特利尔、...2, 约翰, 4.0, 蒙特利尔, ...3、查尔斯、2.0、德克萨斯、... 我想将同一城市的用户更新为相同的 groupId(1 或 2) Id |用户名 |评分 |城市----------
..
我们正在使用 spark 来解析一个大的 csv 文件,其中可能包含无效数据.我们希望将有效数据保存到数据存储中,同时返回我们导入了多少有效数据和多少无效数据. 我想知道我们如何在 spark 中做到这一点,读取数据时的标准方法是什么? 我目前的方法使用 Accumulator,但由于 Accumulator 在 spark 中的工作方式,它并不准确. //我们定义case类CSVI
..
我尝试在 spark 1.4.1 中的 spark-shell 中使用 spark -csv 包处理 CSV 文件. scala>导入 org.apache.spark.sql.hive.HiveContext导入 org.apache.spark.sql.hive.HiveContext标度>导入 org.apache.spark.sql.hive.orc._导入 org.apache.spa
..
我有 textRDD: org.apache.spark.rdd.RDD[(String, String)] 我想将其转换为 DataFrame.列对应每页(行)的标题和内容. 解决方案 使用 toDF(),如果有列名,请提供. val textDF = textRDD.toDF("title": String, "content": String)textDF:org.apach
..
我想使用 VectorAssembler 将多列转换为一列,但默认情况下数据是压缩的,没有其他选项. val arr2= Array((1,2,0,0,0),(1,2,3,0,0),(1,2,4,5,0),(1,2,2,5,6))val df=sc.parallelize(arr2).toDF("a","b","c","e","f")val colNames=Array("a","b","c",
..
我有一个如下所示的 DataFrame. +---+-------------+-----+|id|帐号|比例|+---+-------------+-----+|1|1500847|6||2|1501199|7||3|1119024|3|+---+-------------+-----+ 我必须填充第二个 DataFrame,它最初是空的,如下所示. id AccountNumber 规模
..
对于包含 25 列的表,我尝试将 DF 修改为 RDD.此后我才知道 Scala(直到 2.11.8)最多可以使用 22 个元组. val rdd = sc.textFile("/user/hive/warehouse/myDB.db/myTable/")rdd: org.apache.spark.rdd.RDD[String] =/user/hive/warehouse/myDB.db/myT
..
我有一个数据框 (input_dataframe),如下所示: id test_column1 0.252 1.13 124 测试5 1.33346 .11 我想添加一列 result,如果 test_column 具有十进制值,则该列将值设为 1 并且0 如果 test_column 有任何其他值.test_column 的数据类型是字符串.以下是预期的输出: id test_column
..
我正在尝试为城市中的朋友寻找连接组件.我的数据是具有城市属性的边列表. 城市 |资源中心 |目的地 休斯顿凯尔 -> 本尼 休斯顿本尼 -> 查尔斯 休斯顿查尔斯 -> 丹尼 奥马哈卡罗尔 -> 布莱恩 等等. 我知道 pyspark 的 GraphX 库的 connectedComponents 函数将遍历图的所有边以找到连接的组件,我想避免这种情况.我
..
嗨,我是 Spark Streaming 的新手.我正在尝试读取 xml 文件并将其发送到 kafka 主题.这是我的 Kafka 代码,它向 Kafka-console-consumer 发送数据. 代码: package org.apache.kafka.Kafka_Producer;导入 java.io.BufferedReader;导入 java.io.FileNotFoundEx
..
在 Spark 1.5.1 中,我已经能够使用 Thrift Server 从 Beeline 访问 spark-shell 临时表.通过阅读 Stackoverflow 上相关问题的答案,我已经能够做到这一点. 但是,升级到 Spark 2.0 后,我无法再从 Beeline 看到临时表,这是我正在遵循的步骤. 我正在使用以下命令启动 spark-shell: ./bin/spar
..
运行 spark-submit 作业并收到“无法获取广播_58_piece0..."错误.我真的不确定我做错了什么.我是否过度使用 UDF?功能太复杂? 作为我目标的总结,我正在解析 pdf 中的文本,这些文本以 base64 编码的字符串形式存储在 JSON 对象中.我正在使用 Apache Tika 获取文本,并尝试大量使用数据框以简化操作. 我编写了一段代码,通过 tika 将文
..
我正在尝试添加 JSONSerDe jar 文件以访问 json 数据,将 JSON 数据从 spark 作业加载到 hive 表.我的代码如下所示: SparkConf sparkConf = new SparkConf().setAppName("KafkaStreamToHbase");JavaSparkContext sc = new JavaSparkContext(sparkConf
..
:) 当你有一个数据框时,你可以添加列并使用 selectExprt 方法填充它们的行 像这样: scala>表.show+------+--------+---------+--------+--------+|idempr|tipperrd|codperrd|tipperrt|codperrt|+------+--------+---------+--------+--------
..
我有以下结构的数据框: root|-- 索引:long (nullable = true)|-- 文本:字符串(可为空 = 真)|-- topicDistribution: struct (nullable = true)||-- 类型:long (nullable = true)||-- 值:数组(可为空 = 真)|||-- 元素:double (containsNull = true)|--
..
我有一个 spark SQL 问题 我很欣赏一些关于从嵌套结构数组中进行条件选择的最佳方法的指导. 我在下面有一个示例 json 文档 ``` {"id":"p1",“外部ID":[{"system":"a","id":"1"},{"system":"b","id":"2"},{"system":"c","id":"3"}]} ``` 在 spark SQL 中,我想根据某些
..
我在 python/pyspark 中有一个数据框.列具有特殊字符,如点(.) 空格、括号(()) 和括号{}.以他们的名义. 现在我想重命名列名,如果有点和空格,则用下划线替换它们,如果有 () 和 {},则将它们从列名中删除. 我已经这样做了 df1 = df.toDF(*(re.sub(r'[\.\s]+', '_', c) for c in df.columns)) 有了这
..