Spark文件流问题 [英] Spark FileStreaming issue
本文介绍了Spark文件流问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试使用SparkStreaming(Spark-Streaming_2.10,版本:1.5.1)的简单文件流传输示例
public class DStreamExample {
public static void main(final String[] args) {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SparkJob");
sparkConf.setMaster("local[4]"); // for local
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
final JavaStreamingContext ssc = new JavaStreamingContext(sc,
new Duration(2000));
final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
lines.print();
ssc.start();
ssc.awaitTermination();
}
}
当我在单个文件或控制器上运行此代码时,它不打印文件中的任何内容,我在日志中看到其不断轮询,但没有打印任何内容。此程序运行时,我尝试将文件移动到目录。
我是不是遗漏了什么?我尝试在行RDD上应用map函数,但也不起作用。
推荐答案
TextFileStream接口不是读取现有的目录内容,而是监视给定的Hadoop兼容的文件系统路径是否有变化,必须通过将文件从同一文件系统内的另一个位置移动到监视位置来写入文件。 简而言之,您订阅的是目录更改,并且将收到受监视位置内新显示的文件的内容-在该状态下,文件在监视快照的时刻(在您的情况下为2000毫秒持续时间)出现,并且任何进一步的文件更新都不会到达流,只有目录更新(新文件)可以。
模拟更新的方法是在监控会话期间创建新文件:
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class DStreamExample {
public static void main(final String[] args) throws IOException {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SparkJob");
sparkConf.setMaster("local[4]"); // for local
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
final JavaStreamingContext ssc = new JavaStreamingContext(sc,
new Duration(2000));
final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
// spawn the thread which will create new file within the monitored directory soon
Runnable r = () -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
FileUtils.write(new File("/opt/test/newfile1"), "whatever");
} catch (IOException e) {
e.printStackTrace();
}
};
new Thread(r).start();
lines.foreachRDD((Function<JavaRDD<String>, Void>) rdd -> {
List<String> lines1 = rdd.collect();
lines1.stream().forEach(l -> System.out.println(l));
return null;
});
ssc.start();
ssc.awaitTermination();
}
}
这篇关于Spark文件流问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文