如何在流数据集中加载tar.gz文件? [英] How to load tar.gz files in streaming datasets?

查看:75
本文介绍了如何在流数据集中加载tar.gz文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想从包含我的实际CSV存储数据的tar-gzip文件(tgz)中进行流式传输.

I would like to do streaming from tar-gzip files (tgz) which include my actual CSV stored data.

当我的数据作为CSV文件输入时,我已经设法使用spark 2.2进行结构化流式传输,但是实际上,数据作为gzip压缩后的csv文件输入.

I already managed to do structured streaming with spark 2.2 when my data comes in as CSV files, but actually, the data comes in as gzipped csv files.

在处理CSV流之前,结构化流完成的触发器是否有解压缩的方式?

Is there a way that the trigger done by structured streaming does an decompress before handling the CSV stream?

我用来处理文件的代码是这样的:

The code I use to process the files is this:

val schema = Encoders.product[RawData].schema
val trackerData = spark
  .readStream
  .option("delimiter", "\t")
  .schema(schema)
  .csv(path)
val exceptions = rawCientData
  .as[String]
  .flatMap(extractExceptions)
  .as[ExceptionData]

当路径指向csv文件时,

产生了预期的输出.但是我想使用tar gzip文件.当我尝试将这些文件放置在给定的路径时,没有任何异常,并且批处理输出告诉我

produced output as expected when path points to csv files. But I would like to use tar gzip files. When I try to place those files at the given path, I do not get any exceptions and batch output tells me

  "sources" : [ {
    "description" : "FileStreamSource[file:/Users/matthias/spark/simple_spark/src/main/resources/zsessionlog*]",
    "startOffset" : null,
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 1095,
    "processedRowsPerSecond" : 211.0233185584891
  } ],

但是我没有处理任何实际数据.控制台接收器如下所示:

But I do not get any actual data processed. Console sink looks like this:

+------+---+-----+
|window|id |count|
+------+---+-----+
+------+---+-----+

推荐答案

认为可以在Spark中读取tar.gz文件(请参阅 gzip支持一些想法).

I do not think reading tar.gz'ed files is possible in Spark (see Read whole text files from a compression in Spark or gzip support in Spark for some ideas).

Spark确实支持gzip文件,但不建议将它们视为不可拆分的文件,并且会导致单个分区(反过来会使Spark几乎没有帮助).

Spark does support gzip files, but they are not recommended as not splittable and result in a single partition (that in turn makes Spark of little to no help).

要在Spark Structured Streaming中加载压缩文件,您必须指定路径模式,以便文件包含在加载中,例如 zsessionlog * .csv.gz 之类.否则,仅 csv 仅加载CSV文件.

In order to have gzipped files loaded in Spark Structured Streaming you have to specify the path pattern so the files are included in loading, say zsessionlog*.csv.gz or alike. Else, csv alone loads CSV files only.

如果您坚持使用Spark结构化流处理来处理tar.gz文件,则可以编写自定义流数据 Source 来执行un- tar.gz .

If you insist on Spark Structured Streaming to handle tar.gz'ed files, you could write a custom streaming data Source to do the un-tar.gz.

鉴于在Spark中不建议将gzip文件用作数据格式,因此使用Spark结构化流技术的整个想法没有多大意义.

Given gzip files are not recommended as data format in Spark, the whole idea of using Spark Structured Streaming does not make much sense.

这篇关于如何在流数据集中加载tar.gz文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆