如何使用`ssc.fileStream()`读取镶木地板文件?传递给 `ssc.fileStream()` 的类型是什么? [英] How to read parquet files using `ssc.fileStream()`? What are the types passed to `ssc.fileStream()`?

查看:29
本文介绍了如何使用`ssc.fileStream()`读取镶木地板文件?传递给 `ssc.fileStream()` 的类型是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对Spark的fileStream()方法的理解是它以三种类型为参数:KeyValueFormat.对于文本文件,适当的类型是:LongWritableTextTextInputFormat.

My understanding of Spark's fileStream() method is that it takes three types as parameters: Key, Value, and Format. In case of text files, the appropriate types are: LongWritable, Text, and TextInputFormat.

首先,我想了解这些类型的本质.直觉上,我猜在这种情况下 Key 是文件的行号,而 Value 是该行上的文本.因此,在以下文本文件示例中:

First, I want to understand the nature of these types. Intuitively, I would guess that the Key in this case is the line number of the file, and the Value is the text on that line. So, in the following example of a text file:

Hello
Test
Another Test

DStream 的第一行将有一个 Key1 (0?) 和一个 <Hello 的代码>值.

The first row of the DStream would have a Key of 1 (0?) and a Value of Hello.

这是正确的吗?


我的问题的第二部分:我查看了 ParquetInputFormat 的反编译实现,发现了一些奇怪的东西:


Second part of my question: I looked at the decompiled implementation of ParquetInputFormat and I noticed something curious:

public class ParquetInputFormat<T>
       extends FileInputFormat<Void, T> {
//...

public class TextInputFormat
       extends FileInputFormat<LongWritable, Text>
       implements JobConfigurable {
//...

TextInputFormat 扩展 LongWritableText 类型的 FileInputFormat,而 ParquetInputFormat 扩展同一类类型VoidT.

TextInputFormat extends FileInputFormat of types LongWritable and Text, whereas ParquetInputFormat extends the same class of types Void and T.

这是否意味着我必须创建一个 Value 类来保存整行我的镶木地板数据,然后传递类型 >ssc.fileStream()?

Does this mean that I must create a Value class to hold an entire row of my parquet data, and then pass the types <Void, MyClass, ParquetInputFormat<MyClass>> to ssc.fileStream()?

如果是这样,我应该如何实现 MyClass?

If so, how should I implement MyClass?


EDIT 1:我注意到要传递给 ParquetInputFormat 对象的 readSupportClass.这是一个什么样的类,它是如何用来解析parquet文件的?是否有一些文档涵盖了这一点?


EDIT 1: I have noticed a readSupportClass which is to be passed to ParquetInputFormat objects. What kind of class is this, and how is it used to parse the parquet file? Is there some documentation that covers this?


EDIT 2:据我所知,这是不可能的.如果有人知道如何将镶木地板文件流式传输到 Spark,请随时分享...


EDIT 2: As far as I can tell, this is impossible. If anybody knows how to stream in parquet files to Spark then please feel free to share...

推荐答案

我在 Spark Streaming 中读取 parquet 文件的示例如下.

My sample to read parquet files in Spark Streaming is below.

val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.sparkContext.hadoopConfiguration.set("parquet.read.support.class", "parquet.avro.AvroReadSupport")
val stream = ssc.fileStream[Void, GenericRecord, ParquetInputFormat[GenericRecord]](
  directory, { path: Path => path.toString.endsWith("parquet") }, true, ssc.sparkContext.hadoopConfiguration)

val lines = stream.map(row => {
  println("row:" + row.toString())
  row
})

有些要点是...

  • 记录类型为 GenericRecord
  • readSupportClass 是 AvroReadSupport
  • 将配置传递给fileStream
  • 将 parquet.read.support.class 设置为配置

我参考了下面的源代码来创建示例.
而且我也找不到好的例子.
我想等一个更好的.

I referred to source codes below for creating sample.
And I also could not find good examples.
I would like to wait better one.

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
https:///github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala

这篇关于如何使用`ssc.fileStream()`读取镶木地板文件?传递给 `ssc.fileStream()` 的类型是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆