在PySpark中更新数据帧的某些行或创建新的数据帧 [英] Update some rows of a dataframe or create new dataframe in PySpark

查看:11
本文介绍了在PySpark中更新数据帧的某些行或创建新的数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新手,我的目标是在AWS Glue中使用PySpark脚本:

  1. 从Glue=>;中的输入文件读取数据帧完成
  2. 更改满足条件=>;面临问题的某些行的列
  3. 将同一架构上更新的数据帧写入S3=>;Done

任务似乎很简单,但我找不到完成它的方法,并且仍然面临着更改代码的不同问题。

到目前为止,我的代码如下所示:

Transform2.printSchema() # input schema after reading 
Transform2 = Transform2.toDF()
def updateRow(row):
    # my logic to update row based on a global condition 
    #if row["primaryKey"]=="knownKey": row["otherAttribute"]= None
    return row

LocalTransform3 = [] # creating new dataframe from Transform2 
for row in Transform2.rdd.collect():
    row = row.asDict()
    row = updateRow(row)
    LocalTransform3.append(row)
print(len(LocalTransform3))

columns = Transform2.columns
Transform3 = spark.createDataFrame(LocalTransform3).toDF(*columns)
print('Transform3 count', Transform3.count())
Transform3.printSchema()
Transform3.show(1,truncate=False)

Transform4 = DynamicFrame.fromDF(Transform3, glueContext, "Transform3")
print('Transform4 count', Transform4.count()) 

我尝试使用多种功能,例如:

  • 使用map更新lambda中的现有行
  • 使用Collect()
  • 使用createDataFrame()创建新数据帧

但在以下步骤中遇到错误:

  • 无法创建新的更新RDD
  • 无法使用现有列从RDD创建新数据帧

我在不同阶段得到的胶水中的一些错误:

  • ValueError:推断后无法确定某些类型
  • ValueError:前100行无法确定某些类型,请重试抽样
  • 调用z:org.apache.spark.api.python.PythonRDD.runJob.时出错回溯(最近一次调用):

感谢任何工作代码片段或帮助。

推荐答案

from pyspark.sql.functions import col, lit, when

Transform2 = Transform2.toDF()
withKeyMapping = Transform2.withColumn('otherAttribute', when(col("primaryKey") == "knownKey", lit(None)).otherwise(col('otherAttribute')))

这应该适用于您的用例。

这篇关于在PySpark中更新数据帧的某些行或创建新的数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆