将 csv 文件与不匹配的列相结合 [英] Combining csv files with mismatched columns

查看:56
本文介绍了将 csv 文件与不匹配的列相结合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要将多个 csv 文件组合成一个对象(我假设是一个数据框),但它们都有不匹配的列,如下所示:

I need to combine multiple csv files into one object (a dataframe, I assume) but they all have mismatched columns, like so:

CSV A

store_location_key | product_key | collector_key | trans_dt | sales | units | trans_key

CSV B

collector_key | trans_dt | store_location_key | product_key | sales | units | trans_key

CSV C

collector_key | trans_dt | store_location_key |product_key | sales | units | trans_id

最重要的是,我需要将它们与具有匹配列的另外两个 csv 文件相匹配:

On top of that, I need these to match with two additional csv files that have a matching column:

位置 CSV

store_location_key | region | province | city | postal_code | banner | store_num

产品 CSV

product_key | sku | item_name | item_description | department | category

数据类型都是一致的,即销售列始终是浮动的,store_location_key 始终是 int 等.即使我先将每个 csv 转换为数据帧,我也不确定 join 会起作用(除了最后两个),因为列需要匹配.

The data types are all consistent, i.e., the sales column is always float, store_location_key is always int, etc. Even if I convert each csv to a dataframe first, I'm not sure that a join would work (except for the last two) because of the way that the columns need to match up.

推荐答案

要合并前三个 CSV 文件,首先将其单独读取为 DataFrames,然后使用 union.使用 union 时列的顺序和数量很重要,因此首先您需要将任何缺失的列添加到 DataFrames 中,然后使用 select 确保列在相同的位置订购.

To merge the first three CSV files, first read the separatly as DataFrames and then use union. The order and number of columns when using union matters, so first you need to add any missing columns to the DataFrames and then use select to make sure the columns are in the same order.

all_columns = ['collector_key', 'trans_dt', 'store_location_key', 'product_key', 'sales', 'units', 'trans_key', 'trans_id']

dfA = (spark.read.csv("a.csv", header=True)
  .withColumn(trans_id, lit(null))
  .select(all_columns))
dfB = (spark.read.csv("b.csv", header=True)
  .withColumn(trans_id, lit(null))
  .select(all_columns))
dfC = (spark.read.csv("c.csv", header=True)
  .withColumn(trans_key, lit(null))
  .select(all_columns))

df = dfA.union(dfB).union(dfC)

注意:如果 CSV 文件的顺序/列数相同,则可以通过使用单个 spark.read 操作轻松组合它们.

Note: If the order/number of columns were the same for the CSV files, they could easily be combined by using a single spark.read operation.

合并前三个 CSV 后,剩下的就简单了.位置和产品 CSV 都可以使用 join 与其余部分合并.

After merging the first three CSVs, the rest is easy. Both the location and the product CSV can be combined with the rest using join.

df_location = spark.read.csv("location.csv", header=True)
df_product = spark.read.csv("product.csv", header=True)

df2 = df.join(df_location, df.store_location_key == df_location.store_location_key)
df3 = df2.join(df_product, df2.product_key == df_product.product_key)

这篇关于将 csv 文件与不匹配的列相结合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆