Pyspark从CSV文件读取增量/更新数据集 [英] Pyspark read delta/upsert dataset from csv files
问题描述
我有一个定期更新的数据集,作为一系列提供更改的CSV文件接收.我想要一个仅包含每行最新版本的数据框.有没有一种方法可以将整个数据集加载到Spark/pyspark中以实现并行性?
I have a dataset that is updated periodically, that I receive as a series of CSV files giving the changes. I'd like a Dataframe that contains only the latest version of each row. Is there a way to load the whole dataset in Spark/pyspark that allows for parallelism?
示例:
- 文件1(键,值)
1,ABC 2,DEF 3,GHI
- 文件2(键,值)
2,XYZ 4,UVW
- 文件3(键,值)
3,JKL 4,MNO
- File 1 (Key, Value)
1,ABC 2,DEF 3,GHI
- File 2 (Key, Value)
2,XYZ 4,UVW
- File 3 (Key, Value)
3,JKL 4,MNO
应导致:
1,ABC
2,XYZ
3,JKL
4,MNO
Should result in:
1,ABC
2,XYZ
3,JKL
4,MNO
我知道我可以通过依次加载每个文件,然后使用反连接(踢出要替换的旧值)和联合来做到这一点,但这并不能使工作负载并行化.
I know I could do this by loading each file sequentially and then using an anti join (to kick out old values being replaced) and a union, but that doesn't let the workload be parallel.
推荐答案
您可以
from pyspark.sql.functions import *
alls = spark.read.csv("files/*").withColumn('filename', input_file_name())
这将加载目录中的所有文件,并允许您对带有文件名的列进行操作.
Which will load all the files in the directory and allow you to operate on column with filename.
我认为文件名具有某种时间戳或键,您可以使用window和row_number函数对它们进行区分和排序.
I assume that filename has some sort of timestamp or key on which You can differentiate and order them with window and row_number function.
这篇关于Pyspark从CSV文件读取增量/更新数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!