Pyspark从CSV文件读取增量/更新数据集 [英] Pyspark read delta/upsert dataset from csv files

查看:353
本文介绍了Pyspark从CSV文件读取增量/更新数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个定期更新的数据集,作为一系列提供更改的CSV文件接收.我想要一个仅包含每行最新版本的数据框.有没有一种方法可以将整个数据集加载到Spark/pyspark中以实现并行性?

I have a dataset that is updated periodically, that I receive as a series of CSV files giving the changes. I'd like a Dataframe that contains only the latest version of each row. Is there a way to load the whole dataset in Spark/pyspark that allows for parallelism?

示例:

  • 文件1(键,值) 1,ABC 2,DEF 3,GHI
  • 文件2(键,值) 2,XYZ 4,UVW
  • 文件3(键,值) 3,JKL 4,MNO
  • File 1 (Key, Value) 1,ABC 2,DEF 3,GHI
  • File 2 (Key, Value) 2,XYZ 4,UVW
  • File 3 (Key, Value) 3,JKL 4,MNO

应导致: 1,ABC 2,XYZ 3,JKL 4,MNO

Should result in: 1,ABC 2,XYZ 3,JKL 4,MNO

我知道我可以通过依次加载每个文件,然后使用反连接(踢出要替换的旧值)和联合来做到这一点,但这并不能使工作负载并行化.

I know I could do this by loading each file sequentially and then using an anti join (to kick out old values being replaced) and a union, but that doesn't let the workload be parallel.

推荐答案

您可以

from pyspark.sql.functions import * 
alls = spark.read.csv("files/*").withColumn('filename', input_file_name())

这将加载目录中的所有文件,并允许您对带有文件名的列进行操作.

Which will load all the files in the directory and allow you to operate on column with filename.

我认为文件名具有某种时间戳或键,您可以使用window和row_number函数对它们进行区分和排序.

I assume that filename has some sort of timestamp or key on which You can differentiate and order them with window and row_number function.

这篇关于Pyspark从CSV文件读取增量/更新数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆