火花流FILESTREAM [英] spark streaming fileStream

查看:131
本文介绍了火花流FILESTREAM的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我和火花流媒体节目,但有一些麻烦斯卡拉。我试图使用功能StreamingContext.fileStream

I'm programming with spark streaming but have some trouble with scala. I'm trying to use the function StreamingContext.fileStream

该函数的定义是这样的:

The definition of this function is like this:

def fileStream[K, V, F <: InputFormat[K, V]](directory: String)(implicit arg0: ClassManifest[K], arg1: ClassManifest[V], arg2: ClassManifest[F]): DStream[(K, V)]

创建监视新文件的Hadoop兼容的文件系统,并使用特定键值的类型和输入格式读取它们一个输入流。文件名开头。将被忽略。
ķ
主要类型的阅读HDFS文件
V
阅读HDFS文件值类型
F
输入格式读取HDFS文件
目录
HDFS目录监视新文件

Create a input stream that monitors a Hadoop-compatible filesystem for new files and reads them using the given key-value types and input format. File names starting with . are ignored. K Key type for reading HDFS file V Value type for reading HDFS file F Input format for reading HDFS file directory HDFS directory to monitor for new file

我不知道如何通过Key和Value的类型。
我的code火花流:

I don't know how to pass the type of Key and Value. My Code in spark streaming:

val ssc = new StreamingContext(args(0), "StreamingReceiver", Seconds(1),
  System.getenv("SPARK_HOME"), Seq("/home/mesos/StreamingReceiver.jar"))

// Create a NetworkInputDStream on target ip:port and count the
val lines = ssc.fileStream("/home/sequenceFile")

Java的code编写Hadoop的文件:

Java code to write the hadoop file:

public class MyDriver {

private static final String[] DATA = { "One, two, buckle my shoe",
        "Three, four, shut the door", "Five, six, pick up sticks",
        "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };

public static void main(String[] args) throws IOException {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    Path path = new Path(uri);
    IntWritable key = new IntWritable();
    Text value = new Text();
    SequenceFile.Writer writer = null;
    try {
        writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
                value.getClass());
        for (int i = 0; i < 100; i++) {
            key.set(100 - i);
            value.set(DATA[i % DATA.length]);
            System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,
                    value);
            writer.append(key, value);
        }
    } finally {
        IOUtils.closeStream(writer);
    }
}

}

推荐答案

如果你想使用 FILESTREAM ,你将必须提供所有3型向PARAMS它调用它的时候。你需要知道你的的InputFormat 类型调用它。如果你的类型为 LongWritable 文本的TextInputFormat ,你会叫 FILESTREAM 像这样:

If you want to use fileStream, you're going to have to supply all 3 type params to it when calling it. You need to know what your Key, Value and InputFormat types are before calling it. If your types were LongWritable, Text and TextInputFormat, you would call fileStream like so:

val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/sequenceFile")

如果这3种类型碰巧是你的类型,那么你可能想使用 textFileStream ,而不是因为它不需要任何类型的PARAMS和代表参加 FILESTREAM 使用这些3种类型,我提到。使用是这样的:

If those 3 types do happen to be your types, then you might want to use textFileStream instead as it does not require any type params and delegates to fileStream using those 3 types I mentioned. Using that would look like this:

val lines = ssc.textFileStream("/home/sequenceFile")

这篇关于火花流FILESTREAM的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆