以分布式方式在Spark中读取CSV文件 [英] Reading CSV file in Spark in a distributed manner

查看:561
本文介绍了以分布式方式在Spark中读取CSV文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个Spark处理框架,该框架读取大型CSV文件,将它们加载到RDD中,执行一些转换,最后保存一些统计信息.

I am developing a Spark processing framework which reads large CSV files, loads them into RDD's, performs some transformations and at the end saves some statistics.

有问题的CSV文件平均约为50GB.我正在使用Spark 2.0.

The CSV files in question are around 50GB on average. I'm using Spark 2.0.

我的问题是:

当我使用sparkContext.textFile()函数加载文件时,是否需要先将文件存储在驱动程序的内存中,然后再将其分发给工作人员(因此,在工作区上需要相当大的内存)司机)?还是每个工作人员都并行"读取文件,以一种方式他们不需要存储整个文件,而驱动程序仅充当管理者"?

When I load the files using sparkContext.textFile() function, does the file needs to be stored in the memory of the driver first, and then it is distributed to the workers (thus requiring a rather large amount of memory on the driver)? Or the file is read "in parallel" by every worker, in a way none of them needs to store the whole file, and the driver acts only as a "manager"?

预先感谢

推荐答案

当您定义读数时,文件将根据您的并行性方案划分为多个分区,并将指令发送给工作人员.然后,工作人员直接从文件系统中读取文件(因此需要分布式文件系统可用于所有节点(例如HDFS)).

When you define the reading, the file would be divided to partitions based on your parallelism scheme and the instructions would be sent to the workers. Then the file is read directly by the workers from the filesystem (hence the need for a distributed filesystem available to all the nodes such as HDFS).

作为旁注,最好使用spark.read.csv而不是RDD将其读取到数据帧中.这样会占用更少的内存,并使spark可以优化您的查询.

As a side note, it would be much better to read it to a dataframe using spark.read.csv and not in RDD. This would take less memory and would allow spark to optimize your queries.

更新

在评论中,有人问如果不分发文件系统并且文件仅位于一台计算机上会发生什么情况. 答案是,如果您有一台以上的计算机,则很可能会失败.

In the comment, it was asked what would happen if the file system was not distributed and the file would be located on only one machine. The answer is that If you have more than 1 machine it will most likely fail.

当您执行sparkContext.textFile时,实际上什么都不会读取,它只是告诉spark您要读取的内容.然后,您对其进行了一些转换,但由于您正在定义计划,因此仍然无法读取任何内容.执行动作(例如收集)后,实际的处理就会开始. Spark会将工作分为任务,然后将其发送给执行者.然后,执行者(可能在主节点上或在工作节点上)将尝试读取文件的某些部分.问题在于,不在主节点上的任何执行程序都将查找该文件,但找不到该文件,从而导致任务失败. Spark会重试几次(我相信默认值为4),然后完全失败.

When you do the sparkContext.textFile, nothing is actually read, it just tells spark WHAT you want to read. Then you do some transformation on it and still nothing is read because you are defining a plan. Once you perform an action (e.g. collect) then the actual processing begins. Spark would divide the job into tasks and send them to the executors. The executors (which might be on the master node or on worker nodes) would then attempt to read portions of the file. The problem is that any executor NOT on the master node would look for the file and fail to find it causing the tasks to fail. Spark would retry several times (I believe the default is 4) and then fail completely.

当然,如果您只有一个节点,那么所有执行者都将看到该文件,一切都会正常.同样从理论上讲,任务可能会在工作程序上失败,然后在主服务器上重新运行并在那里成功,但是在任何情况下,除非他们看到文件的副本,否则工作人员不会做任何工作.

Of course if you have just one node then all executors will see the file and everything would be fine. Also in theory, it could be that the tasks would fail on worker and then rerun on the master and succeed there but in any case the workers would not do any work unless they see a copy of the file.

您可以通过将文件复制到所有节点中完全相同的路径或使用任何种类的分布式文件系统(甚至可以使用NFS共享)来解决此问题.

You can solve this by copying the file to the exact same path in all nodes or by using any kind of distributed file system (even NFS shares are fine).

当然,您始终可以在单个节点上工作,但是这样就不会利用spark的可扩展性.

Of course you can always work on a single node but then you would not be taking advantage of spark's scalability.

这篇关于以分布式方式在Spark中读取CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆