pyspark-sql相关内容

Spark“限制"不能并行运行?

我有一个简单的连接,我限制了两边.在解释计划中我看到在执行限制之前有一个 ExchangeSingle 操作,确实我看到在这个阶段集群中只有一个任务在运行. 这当然会显着影响性能(消除限制会消除单个任务瓶颈,但会延长连接,因为它适用于更大的数据集). limit 真的不可并行吗?如果是这样 - 是否有解决方法? 我在 Databricks 集群上使用 spark. 编辑:关 ..
发布时间:2021-11-14 22:45:55 其他开发

如何使用 SPARK 将多个镶木地板文件转换为 TFrecord 文件?

我想根据特定条件从大型 DataFrame 生成分层的 TFrecord 文件,为此我使用 write.partitionBy().我也在 SPARK 中使用了 tensorflow-connector,但这显然不能与 write.partitionBy() 操作一起使用.因此,除了尝试分两步工作之外,我还没有找到其他方法: 根据我的情况,使用 partitionBy() 重新分区数据帧,并 ..
发布时间:2021-11-14 22:45:29 其他开发

Spark SQL 广播提示中间表

我在使用广播提示时遇到问题(可能是缺乏 SQL 知识). 我有一个类似的查询 SELECT */* 广播(a) */从一个内连接 b在 ....内连接 c在 .... 我想做 SELECT */* 广播(a) */从一个内连接 b在 ....内部连接 ​​c/* 广播(AjoinedwithB)*/在 .... 我的意思是,我想强制广播加入(我宁愿避免更改火花参数以在任何地方强制它) ..
发布时间:2021-11-14 22:45:17 其他开发

“‘DataFrame’对象没有‘apply’属性";尝试应用 lambda 来创建新列时

我的目标是在 Pandas DataFrame 中添加一个新列,但我遇到了一个奇怪的错误. 新列应该是现有列的转换,可以在字典/哈希图中进行查找. # 加载数据df = sqlContext.read.format(...).load(train_df_path)# 实例化地图some_map = {'一':0,'b': 1,'c': 1,}# 使用地图创建一个新列df['new_colu ..
发布时间:2021-11-14 22:44:04 Python

替换深层嵌套模式 Spark Dataframe 中的值

我是 pyspark 的新手.我试图了解如何访问具有多层嵌套结构和数组的镶木地板文件.我需要用空值替换数据帧(带有嵌套模式)中的一些值,我已经看到了这个 解决方案 它适用于结构,但不确定它如何适用于数组. 我的架构是这样的 |-- unitOfMeasure: 结构体||-- 原始:结构|||-- id: 字符串|||-- codingSystemId: 字符串|||-- 显示:字符串|| ..
发布时间:2021-11-14 22:43:42 其他开发

将火花数据帧写入单个镶木地板文件

我正在尝试做一些非常简单的事情,但我遇到了一些非常愚蠢的斗争.我认为这一定与对 Spark 正在做什么的根本误解有关.我将不胜感激任何帮助或解释. 我有一个非常大(~3 TB、~300MM 行、25k 分区)的表,在 s3 中保存为镶木地板,我想给某人一个小样本作为单个镶木地板文件.不幸的是,这需要很长时间才能完成,我不明白为什么.我尝试了以下方法: tiny = spark.sql("S ..
发布时间:2021-11-14 22:39:43 其他开发

pyspark 用另一个值替换数据框中的所有值

我的 pyspark 数据框中有 500 列……有些是字符串类型,有些是 int 类型,有些是 boolean(100 个布尔列).现在,所有布尔列都有两个不同的级别 - Yes 和 No,我想将它们转换为 1/0 对于字符串,我有三个值 - 通过、失败和空.如何用 0 替换这些空值?fillna(0) 仅适用于整数 c1|c2 |c3 |c4|c5..... |c500是|是|通过|4 ..
发布时间:2021-11-14 22:39:15 Python

将多行合并为一行

我正在尝试通过 pyspark 构建 sql 来实现这一点.目标是将多行合并成单行例子:我想转换这个 +-----+----+----+-----+|col1|col2|col3|col4|+-----+----+----+-----+|x |是 |z |13::1||x |是 |z |10::2|+-----+----+----+-----+ 到 +-----+----+----+---- ..
发布时间:2021-11-14 22:37:02 其他开发

PySpark 中别名方法的目的是什么?

在 Python 中学习 Spark 时,我无法理解 alias 方法的目的及其用法.文档 显示它被用于创建使用新名称的现有 DataFrame,然后将它们连接在一起: >>>从 pyspark.sql.functions 导入 *>>>df_as1 = df.alias("df_as1")>>>df_as2 = df.alias("df_as2")>>>join_df = df_as1.joi ..
发布时间:2021-11-14 22:34:54 Python