在PySpark中更新数据帧的某些行或创建新的数据帧 [英] Update some rows of a dataframe or create new dataframe in PySpark
本文介绍了在PySpark中更新数据帧的某些行或创建新的数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我是新手,我的目标是在AWS Glue中使用PySpark脚本:
- 从Glue=>;中的输入文件读取数据帧完成
- 更改满足条件=>;面临问题的某些行的列
- 将同一架构上更新的数据帧写入S3=>;Done
任务似乎很简单,但我找不到完成它的方法,并且仍然面临着更改代码的不同问题。
到目前为止,我的代码如下所示:
Transform2.printSchema() # input schema after reading
Transform2 = Transform2.toDF()
def updateRow(row):
# my logic to update row based on a global condition
#if row["primaryKey"]=="knownKey": row["otherAttribute"]= None
return row
LocalTransform3 = [] # creating new dataframe from Transform2
for row in Transform2.rdd.collect():
row = row.asDict()
row = updateRow(row)
LocalTransform3.append(row)
print(len(LocalTransform3))
columns = Transform2.columns
Transform3 = spark.createDataFrame(LocalTransform3).toDF(*columns)
print('Transform3 count', Transform3.count())
Transform3.printSchema()
Transform3.show(1,truncate=False)
Transform4 = DynamicFrame.fromDF(Transform3, glueContext, "Transform3")
print('Transform4 count', Transform4.count())
我尝试使用多种功能,例如:
- 使用map更新lambda中的现有行
- 使用Collect()
- 使用createDataFrame()创建新数据帧
但在以下步骤中遇到错误:
- 无法创建新的更新RDD
- 无法使用现有列从RDD创建新数据帧
我在不同阶段得到的胶水中的一些错误:
- ValueError:推断后无法确定某些类型
- ValueError:前100行无法确定某些类型,请重试抽样
- 调用z:org.apache.spark.api.python.PythonRDD.runJob.时出错回溯(最近一次调用):
感谢任何工作代码片段或帮助。
推荐答案
from pyspark.sql.functions import col, lit, when
Transform2 = Transform2.toDF()
withKeyMapping = Transform2.withColumn('otherAttribute', when(col("primaryKey") == "knownKey", lit(None)).otherwise(col('otherAttribute')))
这应该适用于您的用例。
这篇关于在PySpark中更新数据帧的某些行或创建新的数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文