spark-dataframe相关内容
我在 Spark(2.2.0) 上使用 python3.我想将我的 UDF 应用于指定的字符串列表. df = ['Apps A','Chrome', 'BBM', 'Apps B', 'Skype']def calc_app(app, app_list):browser_list = ['Chrome', 'Firefox', 'Opera']chat_list = ['WhatsApp',
..
我正在使用 pySpark 2.3.0,并创建了一个非常简单的 Spark 数据框来测试 VectorAssembler 的功能.这是一个更大的数据框的一个子集,我只选择了几个数字(双数据类型)列: >>>cols = ['index','host_listings_count','neighborhood_group_cleansed',\'浴室','卧室','床','square_feet'
..
我是 pyspark 的新手.我想对文本文件执行一些机器学习. from pyspark import Row从 pyspark.context 导入 SparkContext从 pyspark.sql.session 导入 SparkSession从 pyspark 导入 SparkConfsc = SparkContextspark = SparkSession.builder.appNam
..
我知道在 RDD 中,我们不鼓励使用 groupByKey,并鼓励使用诸如 reduceByKey() 和 aggregateByKey() 之类的替代方法,因为这些其他方法将首先减少每个分区,然后执行 groupByKey() 从而减少被洗牌的数据量. 现在,我的问题是这是否仍然适用于数据集/数据帧?我在想,由于催化剂引擎做了很多优化,催化剂会自动知道它应该在每个分区上减少,然后执行 gr
..
我想通过 Spark 从 MySQL 读取数据.我看到的 API 能够从特定表中读取数据.类似的东西, val prop = new java.util.Propertiesprop.setProperty("user", "")prop.setProperty("密码", "")sparkSession.read.jdbc("jdbc:mysql://????:3306/???", "some
..
我有一个 pyspark 数据框,其中列偶尔会有与另一列匹配的错误值.它看起来像这样: |日期 |纬度 ||2017-01-01 |43.4553 ||2017-01-02 |42.9399 ||2017-01-03 |43.0091 ||2017-01-04 |2017-01-04 | 显然,最后的纬度值不正确.我需要删除任何和所有这样的行.我想过使用 .isin() 但我似乎无法让它工作.
..
我在 Scala 中转置 DataFrame 中的值时遇到问题.我最初的 DataFrame 看起来像这样: +----+----+----+----+----+|col1|col2|col3|col4|+----+----+----+----+|A|X|6|空||乙|Z|空|5||C|是|4|空|+----+----+----+----+ col1 和 col2 是 String 类型,co
..
我想动态生成一个包含报告标题记录的数据框,因此根据以下字符串的值创建一个数据框: val headerDescs : String = "Name,Age,Location"val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
..
我已经在Spark中创建了数据,然后进行了join操作,最后我必须将输出保存到分区文件中. 我正在将数据帧转换为 RDD,然后保存为允许我使用多字符分隔符的文本文件.我的问题是在这种情况下如何使用数据框列作为自定义分区. 我不能为自定义分区使用以下选项,因为它不支持多字符分隔符: dfMainOutput.write.partitionBy("DataPartition","Stat
..
如何在 Scala 中创建一个具有 100 行和 3 列且随机整数值在 (1, 100) 范围内的 Spark DataFrame? 我知道如何手动创建 DataFrame,但我无法自动创建: val df = sc.parallelize(Seq((1,20, 40), (60, 10, 80), (30, 15, 30))).toDF("col1", "col2", "col3")
..
我想从数据框中获取包含至少一个空值的列的名称. 考虑下面的数据框: val dataset = sparkSession.createDataFrame(Seq((7, null, 18, 1.0),(8, "CA", null, 0.0),(9, "新西兰", 15, 0.0))).toDF("id", "country", "hour", "clicked") 我想获取列名称“国家/
..
这是我的 mongodb 集合架构的一部分: |-- 变量:struct (nullable = true)||-- actives: struct (nullable = true)|||-- 数据:结构(可为空 = 真)||||-- 0: struct (nullable = true)|||||--active: 整数(可为空 = 真)|||||-- 非活动:整数(可为空 = 真) 我已
..
我正在尝试在 Spark 中编写一些注重性能的代码,并想知道我是否应该编写一个 Aggregator 或 用户定义的聚合函数 (UDAF) 用于我对数据帧的汇总操作. 我无法在任何地方找到任何关于这些方法有多快以及您应该在 spark 2.0+ 中使用的数据. 解决方案 你应该写一个 Aggregator 而不是 UserDefinedAggregateFunction 作为 Use
..
使用 Spark SQL,我有两个数据帧,它们是从一个创建的,例如: df = sqlContext.createDataFrame(...);df1 = df.filter("value = 'abc'");//[路径,值]df2 = df.filter("value = 'qwe'");//[路径,值] 我想过滤 df1,如果它的“路径"的一部分是 df2 中的任何路径.因此,如果 df1
..
我在读取一个 6gb 的大单行 json 文件时遇到以下错误: 作业因阶段失败而中止:阶段 0.0 中的任务 5 失败 1 次,最近一次失败:阶段 0.0 中的任务 5.0 丢失(TID 5,本地主机):java.io.IOException:换行前的字节太多: 2147483648 spark 不会读取带有新行的 json 文件,因此整个 6 GB json 文件都在一行上: jf = s
..
我从这样的 csv 文件中取出了一些行 pd.DataFrame(CV_data.take(5), columns=CV_data.columns) 并对其执行了一些功能.现在我想再次将它保存在 csv 中,但它给出了错误 module 'pandas' has no attribute 'to_csv'我正在尝试像这样保存它 pd.to_csv(CV_data, sep='\t', enc
..
我有一个包含一列数组字符串的数据框 A. ...|-- 浏览:数组(可为空 = 真)||-- 元素:字符串(containsNull = true)... 例如三个样本行是 +---------+--------+---------+|第 1 列|浏览|第 n 列|+---------+--------+---------+|foo1|[X,Y,Z]|酒吧1||foo2|[K,L]|栏2||
..
我有一个包含“date"和“value"两列的数据框,如何向数据框中添加 2 个新列“value_mean"和“value_sd",其中“value_mean"是过去 10 个“value"的平均值天(包括“日期"中指定的当天)和“value_sd"是过去 10 天“值"的标准偏差? 解决方案 Spark sql 提供 各种数据框函数,如avg、mean、sum等 您只需要使用 sp
..
我们如何在没有重复列的情况下合并 2 个数据框 a.show()+-----+-----------+--------+------+|姓名|上次|持续时间|状态|+-----+--------------------+--------+------+|鲍勃|2015-04-23 12:33:00|1|登出||爱丽丝|2015-04-20 12:33:00|5|登录|+-----+-------
..
我正在尝试根据查询从 DB2 数据库中读取数据.查询的结果集大约有 20 - 4000 万条记录.DF 的分区是基于一个整数列完成的. 我的问题是,一旦加载了数据,我如何检查每个分区创建了多少条记录.基本上我想检查的是数据倾斜是否发生?如何检查每个分区的记录数? 解决方案 例如,您可以映射分区并确定它们的大小: val rdd = sc.parallelize(0 到 1000,
..