spark-dataframe相关内容
我目前正在使用 PyODBC 从 SQL Server 中提取数据,并尝试以近实时 (NRT) 方式将数据插入到 Hive 中的表中. 我从源中获取了一行并转换为 List[Strings] 并以编程方式创建架构,但是在创建 DataFrame 时,Spark 抛出 StructType 错误. >>>cnxn = pyodbc.connect(con_string)>>>aj = cnx
..
通常一个组中的所有行都传递给一个聚合函数.我想使用条件过滤行,以便仅将组中的某些行传递给聚合函数.使用 PostgreSQL 可以进行此类操作.我想用 Spark SQL DataFrame (Spark 2.0.0) 做同样的事情. 代码可能如下所示: val df = ...//一些数据框df.groupBy("A").agg(max("B").where("B").less(10),
..
我有一个数据框gi_man_df,其中组可以是n: +------------------+-----------------+--------+--------------+|组|数字|rand_int|rand_double|+-----------------+-----------------+--------+--------------+|'GI_MAN'|7|3|124.2||'G
..
假设我有下表: +--------------------+--------------------+------+------------+--------------------+|主机|路径|状态|内容大小|时间|+--------------------+--------------------+------+------------+------------+|js002.cc.uts
..
我试图通过仅保留具有特定字符串列的那些行非空来过滤 DataFrame. 操作如下: df.filter($"stringColumn" !== "") 我的编译器显示 !== 自从我移到 Spark 2.0.1 以来已被弃用 如何在 Spark > 2.0 中检查字符串列值是否为空? 解决方案 使用 =!= 作为替代: df.filter($"stringColumn"
..
我有一个定期更新的数据集,我收到了一系列提供更改的 CSV 文件.我想要一个只包含每行最新版本的数据框.有没有办法在 Spark/pyspark 中加载整个数据集,允许并行? 示例: 文件 1(键、值)1、ABC2、DEF3、GHI 文件 2(键、值)2、XYZ4、紫外线 文件 3(键、值)3、JKL4、移动网络运营商 应该导致:1、ABC2、XYZ3、JKL4、移动网络运营
..
我有一个 spark 数据框,我想根据当前行的金额值和基于 groupid 和 id 的金额值的上一行总和来计算运行总计.让我放出df import findsparkfindspark.init()导入pyspark从 pyspark.sql 导入 SparkSessionspark = SparkSession.builder.getOrCreate()将熊猫导入为 pdsc = spark
..
我不确定这是一个错误(或者只是不正确的语法).我四处搜索,没有在其他地方看到这一点,所以我在提交错误报告之前在这里询问. 我正在尝试使用在嵌套列上分区的 Window 函数.我在下面创建了一个小例子来演示这个问题. import sqlContext.implicits._导入 org.apache.spark.sql.functions._导入 org.apache.spark.sql.
..
如您所知,DataFrame 可以包含复杂类型的字段,例如结构 (StructType) 或数组 (ArrayType).在我的情况下,您可能需要将所有 DataFrame 数据映射到一个带有简单类型字段(字符串、整数...)的 Hive 表.我已经在这个问题上挣扎了很长时间,我终于找到了一个我想分享的解决方案.另外,我相信它可以改进,所以请随时回复您自己的建议. 它基于 这个线程,但也有效
..
在 Spark 的文档中,聚合器: 抽象类聚合器[-IN, BUF, OUT] 扩展可序列化 用户定义聚合的基类,可以是用于数据集操作以获取组的所有元素和将它们减少到一个值. UserDefinedAggregateFunction 是: 抽象类 UserDefinedAggregateFunction 扩展可序列化 实现用户自定义聚合函数的基类(UDAF).
..
我在从 spark master url 运行应用程序时遇到一个奇怪的问题,其中 UI 无限期地报告“WAITING"的“STATE",因为 0 个内核显示在 RUNNING APPLICATIONs 表下,无论我配置什么核心计数. 我使用以下设置配置了我的应用程序,其中 spark.max.cores = 2 &spark.default.cores = 2 &内存设置为3GB.该机器是具
..
我有以下包含 2 列的数据框: 第一列有列名 第二列包含值列表. +--------------------+--------------------+|栏目|分位数|+--------------------+--------------------+|租金|[4000.0, 4500.0, ...||is_rent_changed|[0.0, 0.0, 0.0, 0...||电话
..
我使用 sqlContext 创建了一个数据框,但日期时间格式有问题,因为它被标识为字符串. df2 = sqlContext.createDataFrame(i[1])df2.showdf2.printSchema() 结果: 2016-07-05T17:42:55.238544+09002016-07-05T17:17:38.842567+09002016-06-16T19:54:09.
..
我正在尝试从包含 scala 函数定义的字符串中定义 spark(2.0) 中的 udf.这是片段: val 宇宙:scala.reflect.runtime.universe.type = scala.reflect.runtime.universe导入宇宙._导入 scala.reflect.runtime.currentMirror导入 scala.tools.reflect.ToolBo
..
假设我有一个如下所示的数据框: +---+-----------+-----------+|身份证|地址1|地址2|+---+-----------+-----------+|1|地址 1.1|地址 1.2||2|地址 2.1|地址 2.2|+---+-----------+-----------+ 我想直接对 address1 和 address2 列中的字符串应用自定义函数,例如: de
..
我有一个非常大的 pyspark 数据框.我需要将数据帧转换为每一行的 JSON 格式的字符串,然后将该字符串发布到 Kafka 主题.我最初使用了以下代码. 用于 df.toJSON().collect() 中的消息:kafkaClient.send(消息) 然而,数据帧非常大,因此在尝试 collect() 时失败. 我正在考虑使用 UDF 因为它会逐行处理它. from pys
..
我有包含三列 "x" 、"y" 和 "z" 的数据框 x y z十亿 12452 22114521 330邮编 12563 16022516 142 我需要创建一个由这个公式派生的另一列 (m = z/y+z) 所以新的数据框应该是这样的: x y z m十亿 12452 221 .01743mb 14521 330 .02222pl 12563 160 .0125722516 142
..
谁能解释一下我在 spark DAG 中的 spark 阶段中交换的含义.我的大部分阶段要么开始,要么结束. 1).WholeStageCodeGen -> 交换2).交换 -> WholeStageCodeGen -> SortAggregate -> 交换 解决方案 Whole stage code generation 是一种受现代编译器启发而将整个查询折叠为单个函数的技术在全
..
运行 spark-csv README 有示例 Java像这样的代码 import org.apache.spark.sql.SQLContext;导入 org.apache.spark.sql.types.*; SQLContext sqlContext = new SQLContext(sc);StructType customSchema = new StructType(new Stru
..
我的DataFrame 具有以下结构: -------------------------|品牌 |类型 |金额|-------------------------|乙 ||10 ||乙 |乙 |20 ||C || |30 |------------------------- 我想通过将 type 和 amount 分组为一列 type 来减少行数:Map所以 Brand 将是唯一的,MAP_
..