最终完成写入后,如何处理HDFS目录中的新文件? [英] How to process new files in HDFS directory once their writing has eventually finished?

查看:194
本文介绍了最终完成写入后,如何处理HDFS目录中的新文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的情况下,我有CSV文件连续上传到HDFS.

In my scenario I have CSV files continuously uploaded to HDFS.

新文件上传后,我想使用Spark SQL处理新文件(例如,计算文件中字段的最大值,然后将文件转换为parquet).也就是说,我在每个输入文件和经过转换/处理的输出文件之间都有一对一的映射.

As soon as a new file gets uploaded I'd like to process the new file with Spark SQL (e.g., compute the maximum of a field in the file, transform the file into parquet). i.e. I have a one-to-one mapping between each input file and a transformed/processed output file.

我当时正在评估Spark Streaming以侦听HDFS目录,然后使用Spark处理流文件".

I was evaluating Spark Streaming to listen to the HDFS directory, then to process the "streamed file" with Spark.

但是,为了处理整个文件,我需要知道文件流"何时完成.我想将转换应用于整个文件,以保留文件之间的端到端一对一映射.

However, in order to process the whole file I would need to know when the "file stream" completes. I'd like to apply the transformation to the whole file in order to preserve the end-to-end one-to-one mapping between files.

如何转换整个文件而不是微批处理?

How can I transform the whole file and not its micro-batches?

据我所知,Spark Streaming只能将转换应用于批处理(DStreams映射到RDDs),而不是一次应用于整个文件(当其有限流完成时).

As far as I know, Spark Streaming can only apply transformation to batches (DStreams mapped to RDDs) and not to the whole file at once (when its finite stream has completed).

那是正确的吗?如果是这样,我应该考虑哪种替代方案?

Is that correct? If so, what alternative should I consider for my scenario?

推荐答案

我可能在第一次尝试时误解了您的问题...

I may have misunderstood your question the first try...

据我所知,Spark Streaming只能将转换应用于批处理(将DStreams映射到RDD),而不能一次应用于整个文件(当其有限流完成时).

As far as I know, Spark Streaming can only apply transformation to batches (DStreams mapped to RDDs) and not to the whole file at once (when its finite stream has completed).

对吗?

不. 不正确.

Spark Streaming会立即将转换应用于整个文件,就像在Spark Streaming的批处理间隔过去时写入HDFS一样.

Spark Streaming will apply transformation to the whole file at once as was written to HDFS at the time Spark Streaming's batch interval elapsed.

Spark Streaming将获取文件的当前内容并开始处理它.

Spark Streaming will take the current content of a file and start processing it.

新文件上传后,我需要使用Spark/SparkSQL处理新文件

As soon as a new file gets uploaded I need to process the new file with Spark/SparkSQL

几乎对于Spark来说是不可能的,因为它的体系结构从获得上传"开始需要一段时间.然后Spark对其进行处理.

Almost impossible with Spark due to its architecture which takes some time from the moment "gets uploaded" and Spark processes it.

您应该考虑使用崭新而有光泽的结构化流或(很快就会过时)火花流.

You should consider using a brand new and shiny Structured Streaming or (soon obsolete) Spark Streaming.

这两种解决方案都支持监视目录中的新文件,并在上载新文件时触发Spark作业(这正是您的用例).

Both solutions support watching a directory for new files and trigger Spark job once a new file gets uploaded (which is exactly your use case).

引用结构化流的输入源:

在Spark 2.0中,有一些内置资源.

In Spark 2.0, there are a few built-in sources.

  • 文件源-读取写在目录中的文件作为数据流.支持的文件格式为text,csv,json,parquet.有关最新列表以及每种文件格式支持的选项,请参见DataStreamReader界面的文档.请注意,必须将文件原子地放置在给定目录中,在大多数文件系统中,这可以通过文件移动操作来实现.
  • File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

另请参见Spark Streaming的基本来源:

See also Spark Streaming's Basic Sources:

除了套接字之外,StreamingContext API还提供了从文件作为输入源创建DStream的方法.

Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources.

文件流:要从与HDFS API兼容的任何文件系统(即HDFS,S3,NFS等)上的文件中读取数据,可以将DStream创建为:

File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming将监视目录dataDirectory并处理在该目录中创建的任何文件(不支持在嵌套目录中写入的文件).

Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported).

请注意以下几点:

我需要知道何时文件流"完成.

I would need to know when the "file stream" completes.

请勿通过Spark执行此操作.

Don't do this with Spark.

引用Spark Streaming的基本来源再次:

Quoting Spark Streaming's Basic Sources again:

  • 必须通过原子移动文件或将其重命名到数据目录中来在dataDirectory中创建文件.

  • The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.

一旦移动,不得更改文件.因此,如果文件被连续追加,将不会读取新数据.

Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

打包...仅当文件完整并准备使用Spark处理时,您才应将文件移动到Spark监视的目录中.这不在Spark的范围内.

Wrapping up...you should only move the files to the directory that Spark watches when the files are complete and ready for processing using Spark. This is outside the scope of Spark.

这篇关于最终完成写入后,如何处理HDFS目录中的新文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆