从Apache Spark textFileStream读取文件 [英] Reading files from Apache Spark textFileStream
问题描述
我正在尝试从Hadoop文件系统目录中读取/监视txt文件.但是我注意到该目录中的所有txt文件都是目录本身,如下面的示例所示:
I'm trying to read/monitor txt files from a Hadoop file system directory. But I've noticed all txt files inside this directory are directories themselves as showed in this example bellow:
/crawlerOutput/b6b95b75148cdac44cd55d93fe2bbaa76aa5cccecf3d723c5e47d361b28663be-1427922269.txt/_SUCCESS
/crawlerOutput/b6b95b75148cdac44cd55d93fe2bbaa76aa5cccecf3d723c5e47d361b28663be-1427922269.txt/part-00000
/crawlerOutput/b6b95b75148cdac44cd55d93fe2bbaa76aa5cccecf3d723c5e47d361b28663be-1427922269.txt/part-00001
我想读取零件文件中的所有数据.我正在尝试使用此代码段中显示的以下代码:
I'd want read all the data inside the part's files. I'm trying to use the following code as showed in this snippet:
val testData = ssc.textFileStream("/crawlerOutput/*/*")
但是,不幸的是,它说它不存在/crawlerOutput/*/*
. textFileStream
不接受通配符吗?我该怎么办才能解决这个问题?
But, unfortunately it said it doesn't exist /crawlerOutput/*/*
. Doesn't textFileStream
accept wildcards? What should I do to solve this problem?
推荐答案
textFileStream()
只是 fileStream()
的包装,不支持子目录(请参见 https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html ).
The textFileStream()
is just a wrapper for fileStream()
and does not support subdirectories (see https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html).
您将需要列出要监视的特定目录.如果需要检测新目录,可以使用StreamingListener进行检查,然后停止流上下文并使用新值重新启动.
You would need to list the specific directories to monitor. If you need to detect new directories a StreamingListener could be used to check then stop streaming context and restart with new values.
只需大声思考一下.如果您打算一次处理每个子目录而只想检测这些新目录,则可能会取消可能包含作业信息或文件令牌的另一个位置,该位置一旦存在就可以在流式上下文中使用并调用适当的 textFile()
来摄取新路径.
Just thinking out loud.. If you intend to process each subdirectory once and just want to detect these new directories then potentially key off another location that may contain job info or a file token that once present could be consumed in the streaming context and call the appropriate textFile()
to ingest the new path.
这篇关于从Apache Spark textFileStream读取文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!