pyspark 结构化流式写入分批写入镶木地板 [英] pyspark structured streaming write to parquet in batches

查看:113
本文介绍了pyspark 结构化流式写入分批写入镶木地板的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在对 Spark 结构化流数据帧进行一些转换.我将转换后的数据帧作为镶木地板文件存储在 hdfs 中.现在我希望写入 hdfs 应该分批进行,而不是先转换整个数据帧然后存储数据帧.

I am doing some transformation on the spark structured streaming dataframe. I am storing the transformed dataframe as parquet files in hdfs. Now I want that the write to hdfs should happen in batches instead of transforming the whole dataframe first and then storing the dataframe.

推荐答案

这是一个镶木地板水槽示例:

Here is a parquet sink example:

# parquet sink example
targetParquetHDFS = sourceTopicKAFKA
    .writeStream
    .format("parquet") # can be "orc", "json", "csv", etc.
    .outputMode("append") # can only be "append"
    .option("path", "path/to/destination/dir")
    .partitionBy("col") # if you need to partition
    .trigger(processingTime="...") # "mini-batch" frequency when data is outputed to sink
    .option("checkpointLocation", "path/to/checkpoint/dir") # write-ahead logs for recovery purposes
    .start()
targetParquetHDFS.awaitTermination()

更多具体细节:

Kafka 集成:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

SS 编程指南:https:///spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

好的...我在回复中添加了一些内容以澄清您的问题.

Ok ... I added some stuff to the response to clarify your question.

SS 有几种不同的触发器类型:

SS has a few different Trigger Types:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

默认:在上一个触发器完成处理后发生下一个触发器

default: next trigger happens once previous trigger has completed processing

固定间隔:.trigger(processingTime='10 seconds') 所以 10 秒的触发器将在 00:10、00:20、00:30 触发

一次性:一次处理所有可用数据.trigger(once=True)

one-time: processes all available data at once .trigger(once=True)

连续/固定检查点间隔 => 最好看编程指南文档

continuous / fixed checkpoint interval => best to see programming guide doc

因此,在您的 Kafka 示例中,SS 可以通过默认"或固定间隔"触发器或一次性"处理 Kafka 源主题中的所有可用数据.

Therefore in your Kafka example SS can process the data on the event-time timestamp at micro-batches via the "default" or "fixed interval" triggers or a "one-time" processing of all the data available in the Kafka source topic.

这篇关于pyspark 结构化流式写入分批写入镶木地板的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆