Pyspark 从 csv 文件中读取 delta/upsert 数据集 [英] Pyspark read delta/upsert dataset from csv files
问题描述
我有一个定期更新的数据集,我收到了一系列提供更改的 CSV 文件.我想要一个只包含每行最新版本的数据框.有没有办法在 Spark/pyspark 中加载整个数据集,允许并行?
I have a dataset that is updated periodically, that I receive as a series of CSV files giving the changes. I'd like a Dataframe that contains only the latest version of each row. Is there a way to load the whole dataset in Spark/pyspark that allows for parallelism?
示例:
- 文件 1(键、值)<代码>1、ABC2、DEF3、GHI
- 文件 2(键、值)<代码>2、XYZ4、紫外线
- 文件 3(键、值)<代码>3、JKL4、移动网络运营商
应该导致:<代码>1、ABC2、XYZ3、JKL4、移动网络运营商
我知道我可以通过依次加载每个文件然后使用反联接(踢出被替换的旧值)和联合来实现这一点,但这不会让工作负载并行.
I know I could do this by loading each file sequentially and then using an anti join (to kick out old values being replaced) and a union, but that doesn't let the workload be parallel.
推荐答案
你可以
from pyspark.sql.functions import *
alls = spark.read.csv("files/*").withColumn('filename', input_file_name())
这将加载目录中的所有文件,并允许您对带有文件名的列进行操作.
Which will load all the files in the directory and allow you to operate on column with filename.
我假设文件名具有某种时间戳或键,您可以使用 window 和 row_number 函数对其进行区分和排序.
I assume that filename has some sort of timestamp or key on which You can differentiate and order them with window and row_number function.
这篇关于Pyspark 从 csv 文件中读取 delta/upsert 数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!