编写镶木地板文件时如何避免空文件? [英] How to avoid empty files while writing parquet files?

查看:85
本文介绍了编写镶木地板文件时如何避免空文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark结构化流从Kafka队列中读取.从卡夫卡读取后,我在dataframe上应用了filter.我将这个过滤后的数据帧保存到一个实木复合地板文件中.这将生成许多空的实木复合地板文件.有什么办法可以停止写空文件?

I am reading from Kafka queue using Spark Structured Streaming. After reading from Kafka I am applying filter on the dataframe. I am saving this filtered dataframe into a parquet file. This is generating many empty parquet files. Is there any way I can stop writing an empty file?

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KafkaServer) \
    .option("subscribe", KafkaTopics) \
    .load()

Transaction_DF = df.selectExpr("CAST(value AS STRING)")

decompDF = Transaction_DF.select(zip_extract("value").alias("decompress"))
filterDF = decomDF.filter(.....) 

query = filterDF .writeStream \
    .option("path", outputpath) \
    .option("checkpointLocation", RawXMLCheckpoint) \
    .start()

推荐答案

我建议对Dataframe分别使用repartition(partitioningColumns).数据集,然后在writeStream操作上进行partitionBy(partitioningColumns)操作,以避免写入空文件.

I recommend using repartition(partitioningColumns) on the Dataframe resp. Dataset and after that partitionBy(partitioningColumns) on the writeStream operation to avoid writing empty files.

原因: 如果您有很多数据(没有空)并且没有分区,那么数据量大的瓶颈通常是Spark的读取性能.因此,您绝对应该利用文件/目录分区(与RDD分区不同). 使用AWS S3时,这尤其是个问题. 当读取数据(例如时间戳/天,消息类型/Kafka主题,...等)时,partitionColumns应该适合您的常见查询.

Reason: The bottleneck if you have a lot of data is often the read performance with Spark if you have a lot of small (or even empty) files and no partitioning. So you should definitely make use of the file/directory partitioning (which is not the same as RDD partitioning). This is especially a problem when using AWS S3. The partitionColumns should fit your common queries when reading the data like timestamp/day, message type/Kafka topic, ...

另请参见partitionBy文档" rel = "nofollow noreferrer"> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

See also the partitionBy documentation on http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

按文件系统上给定的列对输出进行分区.如果指定,则输出将与Hive的分区方案类似地布置在文件系统上.例如,当我们按年份然后按月份对数据集进行分区时,目录布局将如下所示:

Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:

year = 2016/month = 01/,year = 2016/month = 02/

year=2016/month=01/, year=2016/month=02/

分区是用于优化物理数据布局的最广泛使用的技术之一.当查询在分区列上有谓词时,它提供了粗粒度索引,以跳过不必要的数据读取.为了使分区正常工作,每列中不同值的数量通常应少于几万.

Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.

这适用于所有基于Spark 2.1.0的基于文件的数据源(例如Parquet,JSON).

This is applicable for all file-based data sources (e.g. Parquet, JSON) staring Spark 2.1.0.

这篇关于编写镶木地板文件时如何避免空文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆