将CSV文件与不匹配的列合并 [英] Combining csv files with mismatched columns

查看:269
本文介绍了将CSV文件与不匹配的列合并的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将多个csv文件组合到一个对象(我认为是一个数据帧)中,但是它们都具有不匹配的列,就像这样:

I need to combine multiple csv files into one object (a dataframe, I assume) but they all have mismatched columns, like so:

CSV A

store_location_key | product_key | collector_key | trans_dt | sales | units | trans_key

CSV B

collector_key | trans_dt | store_location_key | product_key | sales | units | trans_key

CSV C

collector_key | trans_dt | store_location_key |product_key | sales | units | trans_id

最重要的是,我需要将它们与具有匹配列的两个其他csv文件进行匹配:

On top of that, I need these to match with two additional csv files that have a matching column:

位置CSV

store_location_key | region | province | city | postal_code | banner | store_num

产品CSV

product_key | sku | item_name | item_description | department | category

数据类型都是一致的,即sales列始终为float,store_location_key始终为int,等等.即使我先将每个csv转换为一个数据框,我也不确定join是否会起作用(除了最后两个),因为各列需要匹配.

The data types are all consistent, i.e., the sales column is always float, store_location_key is always int, etc. Even if I convert each csv to a dataframe first, I'm not sure that a join would work (except for the last two) because of the way that the columns need to match up.

推荐答案

要合并前三个CSV文件,请首先分别读取DataFrames,然后使用union.使用union时列的顺序和数量很重要,因此首先需要将所有缺少的列添加到DataFrame中,然后使用select来确保列的顺序相同.

To merge the first three CSV files, first read the separatly as DataFrames and then use union. The order and number of columns when using union matters, so first you need to add any missing columns to the DataFrames and then use select to make sure the columns are in the same order.

all_columns = ['collector_key', 'trans_dt', 'store_location_key', 'product_key', 'sales', 'units', 'trans_key', 'trans_id']

dfA = (spark.read.csv("a.csv", header=True)
  .withColumn(trans_id, lit(null))
  .select(all_columns))
dfB = (spark.read.csv("b.csv", header=True)
  .withColumn(trans_id, lit(null))
  .select(all_columns))
dfC = (spark.read.csv("c.csv", header=True)
  .withColumn(trans_key, lit(null))
  .select(all_columns))

df = dfA.union(dfB).union(dfC)

注意:如果CSV文件的列顺序/数量相同,则可以通过单个spark.read操作轻松组合它们.

Note: If the order/number of columns were the same for the CSV files, they could easily be combined by using a single spark.read operation.

合并前三个CSV后,其余的操作很容易.位置和产品CSV都可以使用join与其他CSV组合.

After merging the first three CSVs, the rest is easy. Both the location and the product CSV can be combined with the rest using join.

df_location = spark.read.csv("location.csv", header=True)
df_product = spark.read.csv("product.csv", header=True)

df2 = df.join(df_location, df.store_location_key == df_location.store_location_key)
df3 = df2.join(df_product, df2.product_key == df_product.product_key)

这篇关于将CSV文件与不匹配的列合并的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆