Pyspark 从 csv 文件中读取 delta/upsert 数据集 [英] Pyspark read delta/upsert dataset from csv files

查看:33
本文介绍了Pyspark 从 csv 文件中读取 delta/upsert 数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个定期更新的数据集,我收到了一系列提供更改的 CSV 文件.我想要一个只包含每行最新版本的数据框.有没有办法在 Spark/pyspark 中加载整个数据集,允许并行?

I have a dataset that is updated periodically, that I receive as a series of CSV files giving the changes. I'd like a Dataframe that contains only the latest version of each row. Is there a way to load the whole dataset in Spark/pyspark that allows for parallelism?

示例:

  • 文件 1(键、值)<代码>1、ABC2、DEF3、GHI
  • 文件 2(键、值)<代码>2、XYZ4、紫外线
  • 文件 3(键、值)<代码>3、JKL4、移动网络运营商

应该导致:<代码>1、ABC2、XYZ3、JKL4、移动网络运营商

我知道我可以通过依次加载每个文件然后使用反联接(踢出被替换的旧值)和联合来实现这一点,但这不会让工作负载并行.

I know I could do this by loading each file sequentially and then using an anti join (to kick out old values being replaced) and a union, but that doesn't let the workload be parallel.

推荐答案

你可以

from pyspark.sql.functions import * 
alls = spark.read.csv("files/*").withColumn('filename', input_file_name())

这将加载目录中的所有文件,并允许您对带有文件名的列进行操作.

Which will load all the files in the directory and allow you to operate on column with filename.

我假设文件名具有某种时间戳或键,您可以使用 window 和 row_number 函数对其进行区分和排序.

I assume that filename has some sort of timestamp or key on which You can differentiate and order them with window and row_number function.

这篇关于Pyspark 从 csv 文件中读取 delta/upsert 数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆