spark-dataframe相关内容

Spark Sql: TypeError(“StructType 不能接受 %s 类型的对象"% type(obj))

我目前正在使用 PyODBC 从 SQL Server 中提取数据,并尝试以近实时 (NRT) 方式将数据插入到 Hive 中的表中. 我从源中获取了一行并转换为 List[Strings] 并以编程方式创建架构,但是在创建 DataFrame 时,Spark 抛出 StructType 错误. >>>cnxn = pyodbc.connect(con_string)>>>aj = cnx ..
发布时间:2021-11-14 22:29:06 Python

如何使用spark sql过滤特定聚合的行?

通常一个组中的所有行都传递给一个聚合函数.我想使用条件过滤行,以便仅将组中的某些行传递给聚合函数.使用 PostgreSQL 可以进行此类操作.我想用 Spark SQL DataFrame (Spark 2.0.0) 做同样的事情. 代码可能如下所示: val df = ...//一些数据框df.groupBy("A").agg(max("B").where("B").less(10), ..

Pyspark 从 csv 文件中读取 delta/upsert 数据集

我有一个定期更新的数据集,我收到了一系列提供更改的 CSV 文件.我想要一个只包含每行最新版本的数据框.有没有办法在 Spark/pyspark 中加载整个数据集,允许并行? 示例: 文件 1(键、值)1、ABC2、DEF3、GHI 文件 2(键、值)2、XYZ4、紫外线 文件 3(键、值)3、JKL4、移动网络运营商 应该导致:1、ABC2、XYZ3、JKL4、移动网络运营 ..
发布时间:2021-11-14 22:28:19 其他开发

带有嵌套列的 Apache Spark 窗口函数

我不确定这是一个错误(或者只是不正确的语法).我四处搜索,没有在其他地方看到这一点,所以我在提交错误报告之前在这里询问. 我正在尝试使用在嵌套列上分区的 Window 函数.我在下面创建了一个小例子来演示这个问题. import sqlContext.implicits._导入 org.apache.spark.sql.functions._导入 org.apache.spark.sql. ..

用不同的数据类型在 Scala 中展平数据帧

如您所知,DataFrame 可以包含复杂类型的字段,例如结构 (StructType) 或数组 (ArrayType).在我的情况下,您可能需要将所有 DataFrame 数据映射到一个带有简单类型字段(字符串、整数...)的 Hive 表.我已经在这个问题上挣扎了很长时间,我终于找到了一个我想分享的解决方案.另外,我相信它可以改进,所以请随时回复您自己的建议. 它基于 这个线程,但也有效 ..
发布时间:2021-11-14 22:27:29 其他开发

即使在应用程序中设置内核时,Spark UI 也显示 0 个内核

我在从 spark master url 运行应用程序时遇到一个奇怪的问题,其中 UI 无限期地报告“WAITING"的“STATE",因为 0 个内核显示在 RUNNING APPLICATIONs 表下,无论我配置什么核心计数. 我使用以下设置配置了我的应用程序,其中 spark.max.cores = 2 &spark.default.cores = 2 &内存设置为3GB.该机器是具 ..

PySpark - 逐行转换为 JSON

我有一个非常大的 pyspark 数据框.我需要将数据帧转换为每一行的 JSON 格式的字符串,然后将该字符串发布到 Kafka 主题.我最初使用了以下代码. 用于 df.toJSON().collect() 中的消息:kafkaClient.send(消息) 然而,数据帧非常大,因此在尝试 collect() 时失败. 我正在考虑使用 UDF 因为它会逐行处理它. from pys ..
发布时间:2021-11-14 22:25:24 Python

Spark阶段交换的意义

谁能解释一下我在 spark DAG 中的 spark 阶段中交换的含义.我的大部分阶段要么开始,要么结束. 1).WholeStageCodeGen -> 交换2).交换 -> WholeStageCodeGen -> SortAggregate -> 交换 解决方案 Whole stage code generation 是一种受现代编译器启发而将整个查询折叠为单个函数的技术在全 ..
发布时间:2021-11-14 22:24:52 其他开发