分别在Spark中处理多个目录 [英] Process multiple directories in spark separately

查看:189
本文介绍了分别在Spark中处理多个目录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在HDFS中有一个目录列表,每个目录包含几个文件.我的目标是将所有文件从一个目录合并到一个文件中,但将每个目录分别合并.什么是最快的方式来做到这一点?按顺序遍历所有目录太慢.所以我想同时做.一种解决方案可能是使用线程池.也许有一个更好,更快,更原生的人了?

I have got a list of directories in HDFS each containing several files. My goal is to merge all files from one directory into only one file but for each directory separately. What is the fastest way to do this in spark? Sequentially iterating over all directories is too slow. So I want to do it in parallel. One solution might be to use a thread pool. Maybe there is a better and faster more native one?

谢谢!

推荐答案

请考虑以下包含以下文件的测试目录foobar:

Consider the following test directories foo and bar that contains the following files:

cat /tmp/foo/0.csv
4
cat /tmp/foo/1.csv
3
cat /tmp/bar/0.csv
7

我们可以使用以下代码片段阅读它们:

We can read them using the following snippet:

val df = spark.read.csv("/tmp/foo", "/tmp/bar")
  .withColumn("dir", regexp_extract(input_file_name(), """([^/]*)/[^/]+\.csv$""", 1))
df.show()
/*
+---+---+
|_c0|dir|
+---+---+
|4  |foo|
|3  |foo|
|7  |bar|
+---+---+
*/

函数input_file_name给出了文件的绝对路径,因此我们可以使用它来获取目录.函数regexp_extract仅用于转换例如/tmp/foo/1.csv -> foo.

Function input_file_name gives the absolute path of the file so we can use that to get the directory. Function regexp_extract is used just to convert e.g /tmp/foo/1.csv -> foo.

Spark写入文件时,每个分区输出一个文件.因此,我们需要按列dir重新分区以合并每个目录下的所有文件.最后,我们也可以使用partitionBy将目录名也获取到输出文件结构中.例如

When Spark writes files, it outputs one file per partition. Thus, we need to repartition by column dir to merge all files under each dir. Finally, we can use partitionBy to get the directory name to the output file structure as well. For example

df.repartition($"dir")
  .write
  .partitionBy("dir")
  .csv("/tmp/out")

会产生文件

/tmp/out/._SUCCESS.crc
/tmp/out/dir=bar/.part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
/tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/_SUCCESS
/tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/dir=foo/.part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc

其中/tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv包含

7

/tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv包含

4
3

AFAIK不能将这些输出文件写到与原始输入相同的目录结构中,而无需例如具有自定义的Hadoop FileSystem类等.

AFAIK it is not possible to write these output files to the same directory structure as the original input without e.g. having a customised Hadoop FileSystem class etc.

这篇关于分别在Spark中处理多个目录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆