如何使星火流写入其输出,使黑斑羚可以看了吗? [英] How to make Spark Streaming write its output so that Impala can read it?

查看:241
本文介绍了如何使星火流写入其输出,使黑斑羚可以看了吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有星火流API以下问题。我目前通过水槽流输入数据流星火,与我打算做的一些数据preprocessing。然后,我想将数据保存到Hadoop的文件系统和因帕拉查询。然而,火花是写数据文件分开的目录和用于每RDD产生新目录。

I have the following problem with Spark Streaming API. I am currently streaming input data via Flume to Spark Streaming, with which I plan to do some preprocessing for the data. Then, I'd like to save the data to Hadoop's file system and query it with Impala. However, Spark is writing the data files to separate directories and a new directory is generated for every RDD.

这是一个问题,因为,首先,在黑斑羚外部表无法检测子目录,但只有文件,目录内他们指向的,除非分区。其次,新目录由星火增加如此之快,这将是非常糟糕的性能,为每个生成的目录中创建定期帕拉一个新的分区。另一方面,如果我选择增加在火花的写入的轧辊间隔,从而使目录将被较不频繁产生,将有一个附加的延迟,直到帕拉可以读取传入的数据。这是不能接受的,因为我的系统具有支持实时应用。在蜂巢,我可以配置外部表也检测出子目录,而不需要进行分区,通过使用这些设置:

This is a problem because, first of all, the external tables in Impala cannot detect subdirectories, but only files, inside the directory they are pointing to, unless partitioned. Secondly, the new directories are added so fast by Spark that it would be very bad for performance to create a new partition periodically in Impala for every generated directory. On the other hand, if I choose to increase the roll interval of the writes in Spark, so that the directories will be generated less frequently, there will be an added delay until Impala can read the incoming data. This is not acceptable since my system has to support real-time applications. In Hive, I could configure the external tables to also detect the subdirectories without need for partitioning, by using these settings:

set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;

但我understandig黑斑羚不具有这样的功能。

But to my understandig Impala does not have a feature like this.

我目前使用以下code从水槽读取数据,并将其写入到HDFS:

I am currently using the following code for reading the data from Flume and writing it to HDFS:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)

下面,可变路径确定的目录的preFIX,文本文件(部分-0000等)添加到其中,和目录名的其余部分是由火花产生的时间戳。我可以在code更改为这样的事情:

Here, the variable path determines the prefix of the directory, to which the text files (part-0000 and so on) are added, and the rest of the directory name is a timestamp generated by Spark. I could change the code to something like this:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))

在这种情况下,文件将被添加到由路径确定的同一目录中,但因为他们总是名为part-00000,部分00001,部分00002等。previously生成的文件将被覆盖。在检查星火源$ C ​​$ C,我注意到,文件的名称会被SparkHadoopWriter的开放线(决定)方法:

In this case the files would be added to the same directory determined by path, but since they are always named part-00000, part-00001, part-00002, etc. the previously generated files will be overwritten. While examining the source code of Spark, I noticed that the names of the files are determined by a line in SparkHadoopWriter's open() method:

val outputName = "part-"  + numfmt.format(splitID)

和在我看来,有没有办法通过星火API来处理splitID。总之,我的问题有以下几点:

And it seems to me that there is no way to manipulate splitID through Spark API. To summarize, my questions are the following:


  • 有使外部表的黑斑羚检测子目录的任何方法?

  • 如果没有,有没有使星火写入其输出文件到一个目录中或以其他形式通过黑斑羚?
  • 是瞬间读取任何方法
  • 如果不是,是否有预期星火解决这个问题任何形式的更新,或者我应该只是分支我自己星火版本,我可以决定名称的文件,它写入自己?

  • Is there any method to make the external tables in Impala detect subdirectories?
  • If not, is there any method to make Spark write its output files into a single directory or otherwise in a form that is instantly readable by Impala?
  • If not, is there any kind of update expected with Spark to fix this issue or should I just branch my own version of Spark with which I can decide the names the files that it writes myself?

推荐答案

我不能为黑斑羚说话。

部分-XXXXX是星火如下Hadoop的约定。大多数工具理解这种格式,我猜想,星火也做不了什么了。部分文件需要是唯一的,附加分区编号的文件名是一个常用的技术。

part-xxxxx is a hadoop convention which Spark follows. Most tools understand this format and I would guess that Spark cannot do much about it. The part files need to be unique and appending partition number to the file name is a common technique.

我想看看在黑斑羚,看看如何读取部分文件,因为大多数的Hadoop工具生成这种方式。

I would look in Impala to see how to read part file as most hadoop tools generate it this way.

如果一个人想自定义目录结构 - 虽然这不是你的问题 - 它可以轻松实现,再说更改 preFIX时间戳后缀格式。星火蒸使用星火的 RDD.saveAsTextFiles(..)引擎盖,这可定制下。下面是从DStream.scala的code:

If one wants to customize the directory structure - though that is not your question - it can be easily achieved, say to change the prefix-timestamp-suffix format. Spark Steaming uses Spark's RDD.saveAsTextFiles(..) under the hood, which could be customized. Here is the code from DStream.scala:

  def saveAsTextFiles(prefix: String, suffix: String = "") {
    val saveFunc = (rdd: RDD[T], time: Time) => {
      val file = rddToFileName(prefix, suffix, time)
      rdd.saveAsTextFile(file)
    }
    this.foreachRDD(saveFunc)
  }

这篇关于如何使星火流写入其输出,使黑斑羚可以看了吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆