如何在 Spark Structured Streaming 中指定批处理间隔? [英] How to specify batch interval in Spark Structured Streaming?

查看:129
本文介绍了如何在 Spark Structured Streaming 中指定批处理间隔?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spark Structured Streaming 并遇到问题.

I am going through Spark Structured Streaming and encountered a problem.

在StreamingContext、DStreams中,我们可以定义一个批处理间隔如下:

In StreamingContext, DStreams, we can define a batch interval as follows :

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 5) # 5 second batch interval

如何在结构化流媒体中做到这一点?

How to do this in Structured Streaming?

我的流媒体类似于:

sparkStreaming = SparkSession \
.builder \
.appName("StreamExample1") \
.getOrCreate()

stream_df = sparkStreaming.readStream.schema("col0 STRING, col1 INTEGER").option("maxFilesPerTrigger", 1).\
csv("C:/sparkStream")

sql1 = stream_df.groupBy("col0").sum("col1")
query = sql1.writeStream.queryName("stream1").outputMode("complete").format("memory").start() 

此代码按预期工作,但是,如何/在此处定义批处理间隔?

This code is working as expected but, how to/where to define the batch interval here?

我是结构化流媒体的新手,请指导我.

I am new to Structured Streaming, please guide me.

推荐答案

tl;dr 使用 trigger(...)(在 DataStreamWriter,即在writeStream)

tl;dr Use trigger(...) (on the DataStreamWriter, i.e. after writeStream)

这是一个很好的来源 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.

This is an excellent source https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.

有多种选择,如果您不设置批处理间隔,Spark 会在处理完最后一个批处理后立即查找数据.触发器就是这里.

There are various options, if you do not set a batch interval, Spark will look for data as soon as it has processed last batch. Trigger is the go here.

来自手册:

流式查询的触发器设置定义了流数据处理,查询是否要执行为具有固定批次间隔或连续的微批次查询处理查询.

The trigger settings of a streaming query defines the timing of streaming data processing, whether the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query.

一些例子:

df.writeStream \
  .format("console") \
  .start()

ProcessingTime 触发器,微批次间隔为两秒

df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

一次性触发

df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

具有一秒检查点间隔的连续触发

df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()

这篇关于如何在 Spark Structured Streaming 中指定批处理间隔?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆