我怎样才能让星火流计算一个文件的话在单元测试? [英] How can I make Spark Streaming count the words in a file in a unit test?

查看:153
本文介绍了我怎样才能让星火流计算一个文件的话在单元测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经成功地用Java构建一个非常简单的星火流媒体应用程序,是基于<一个href=\"https://github.com/apache/spark/blob/branch-1.1/examples/src/main/scala/org/apache/spark/examples/streaming/HdfsWordCount.scala\"相对=nofollow>斯卡拉 HdfsCount例子。

当我提交申请到我的本地火花,等待被写入到指定目录下的文件,当我创建一个文件时,它成功打印的字数。我被终止pressing按Ctrl + C的应用程序。

现在我试图创建此功能的一个非常基本的单元测试,但在测试我是不是能够打印相同的信息,那就是单词的数目。

我是什么失踪?

下面是单元测试文件,之后我还包含了code片段,显示countWords方式:

StarterAppTest.java

 进口com.google.common.io.Files;
进口org.apache.spark.streaming.Duration;
进口org.apache.spark.streaming.api.java.JavaDStream;
进口org.apache.spark.streaming.api.java.JavaPairDStream;
进口org.apache.spark.streaming.api.java.JavaStreamingContext;
导入org.junit *。进口java.io. *;公共类StarterAppTest {  JavaStreamingContext SSC;
  文件的tempDir;  @之前
  公共无效设置(){
    SSC =新JavaStreamingContext(本地,测试,新的持续时间(3000));
    的tempDir = Files.createTempDir();
    tempDir.deleteOnExit();
  }  @后
  公共无效拆解(){
    ssc.stop();
    SSC = NULL;
  }  @测试
  公共无效testInitialization(){
    Assert.assertNotNull(ssc.sc());
  }
  @测试
  公共无效testCountWords(){    StarterApp starterApp =新StarterApp();    尝试{
      JavaDStream&LT;串GT;线= ssc.textFileStream(tempDir.getAbsolutePath());
      JavaPairDStream&LT;字符串,整数&GT; wordCounts = starterApp.countWords(系);      ssc.start();      文件TMPFILE =新的文件(tempDir.getAbsolutePath(),tmp.txt);
      为PrintWriter作家=新的PrintWriter(TMPFILE,UTF-8);
      writer.println(8日 - 12月2014年:埃姆雷,埃姆雷,埃姆雷埃尔金埃尔金埃尔金);
      writer.close();      通信System.err.println(=====字数=======);
      wordCounts.print();
      通信System.err.println(=====字数=======);    }赶上(FileNotFoundException异常五){
      e.printStackTrace();
    }赶上(UnsupportedEncodingException五){
      e.printStackTrace();
    }
    Assert.assertTrue(真);  }}

此测试编译并开始运行,星火流出版大量控制台,但调用 wordCounts.print()的诊断消息不打印任何东西,而在StarterApp.java本身,他们做的。

我也尝试添加 ssc.awaitTermination(); ssc.start()但没有改变在这方面。从那以后,我也试图在该目录中手动创建一个新的文件,这个星火流应用程序检查,但这次它给了一个错误。

有关完整性,下面是wordCounts方式:

 公共JavaPairDStream&LT;字符串,整数&GT; countWords(JavaDStream&LT;串GT;线){
    JavaDStream&LT;串GT;字= lines.flatMap(新FlatMapFunction&LT;字符串,字符串&GT;(){
      @覆盖
      公众可迭代&LT;串GT;调用(字符串x){返回Lists.newArrayList(SPACE.split(X)); }
    });    JavaPairDStream&LT;字符串,整数&GT; wordCounts = words.mapToPair(
            新PairFunction&LT;字符串,字符串,整数&GT;(){
              @覆盖
              公共Tuple2&LT;字符串,整数&GT;致电(String s)将{返回新Tuple2&LT;&GT;(S,1); }
            })reduceByKey((I1,I2) - 方式&gt; I1 + I2);    返回wordCounts;
  }


解决方案

几个要点:


  • 提供至少2个内核SparkStreaming上下文。 1对于流媒体和1星火处理。 本地 - >本地[2]

  • 您的流间隔为3000ms的,在你的程序,所以地方需要等待-at least-当时期望的输出。

  • 星火流需要一些时间,听众的设置。正后的 ssc.start 发出立即创建的文件。有没有保证,文件系统监听器已经到位。我会做一些睡眠(XX) ssc.start

在流媒体,它是所有关于在正确的时间。

I've successfully built a very simple Spark Streaming application in Java that is based on the HdfsCount example in Scala.

When I submit this application to my local Spark, it waits for a file to be written to a given directory, and when I create that file it successfully prints the number of words. I terminate the application by pressing Ctrl+C.

Now I've tried to create a very basic unit test for this functionality, but in the test I was not able to print the same information, that is the number of words.

What am I missing?

Below is the unit test file, and after that I've also included the code snippet that shows the countWords method:

StarterAppTest.java

import com.google.common.io.Files;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.junit.*;

import java.io.*;

public class StarterAppTest {

  JavaStreamingContext ssc;
  File tempDir;

  @Before
  public void setUp() {
    ssc = new JavaStreamingContext("local", "test", new Duration(3000));
    tempDir = Files.createTempDir();
    tempDir.deleteOnExit();
  }

  @After
  public void tearDown() {
    ssc.stop();
    ssc = null;
  }

  @Test
  public void testInitialization() {
    Assert.assertNotNull(ssc.sc());
  }


  @Test
  public void testCountWords() {

    StarterApp starterApp = new StarterApp();

    try {
      JavaDStream<String> lines = ssc.textFileStream(tempDir.getAbsolutePath());
      JavaPairDStream<String, Integer> wordCounts = starterApp.countWords(lines);

      ssc.start();

      File tmpFile = new File(tempDir.getAbsolutePath(), "tmp.txt");
      PrintWriter writer = new PrintWriter(tmpFile, "UTF-8");
      writer.println("8-Dec-2014: Emre Emre Emre Ergin Ergin Ergin");
      writer.close();

      System.err.println("===== Word Counts =======");
      wordCounts.print();
      System.err.println("===== Word Counts =======");

    } catch (FileNotFoundException e) {
      e.printStackTrace();
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }


    Assert.assertTrue(true);

  }

}

This test compiles and starts to run, Spark Streaming prints a lot of diagnostic messages on the console but the call to wordCounts.print() does not print anything, whereas in StarterApp.java itself, they do.

I've also tried adding ssc.awaitTermination(); after ssc.start() but nothing changed in that respect. After that I've also tried to create a new file manually in the directory that this Spark Streaming application was checking but this time it gave an error.

For completeness, below is the wordCounts method:

public JavaPairDStream<String, Integer> countWords(JavaDStream<String> lines) {
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); }
    });

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
            new PairFunction<String, String, Integer>() {
              @Override
              public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); }
            }).reduceByKey((i1, i2) -> i1 + i2);

    return wordCounts;
  }

解决方案

Few pointers:

  • Give at least 2 cores to SparkStreaming context. 1 for the Streaming and 1 for the Spark processing. "local" -> "local[2]"
  • Your streaming interval is of 3000ms, so somewhere in your program you need to wait -at least- that time to expect an output.
  • Spark Streaming needs some time for the setup of listeners. The file is being created immediately after ssc.start is issued. There's no warranty that the filesystem listener is already in place. I'd do some sleep(xx) after ssc.start

In Streaming, it's all about the right timing.

这篇关于我怎样才能让星火流计算一个文件的话在单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆