结构化流-消耗每条消息 [英] Structured Streaming - Consume each message
问题描述
处理通过结构化流水线传输的每条消息(火花"在2.1.1上,源为Kafka 0.10.2.1)的推荐"方式是什么?
What would be the "recommended" way to process each message as it comes through Structured streaming pipeline (i m on spark 2.1.1 with source being Kafka 0.10.2.1) ?
到目前为止,我正在查看dataframe.mapPartitions
(因为我需要连接到HBase,其客户端连接类不可序列化,因此是mapPartitions
).
So far, I am looking at dataframe.mapPartitions
(since i need to connect to HBase, whose client connection classes are not serizalable, hence mapPartitions
).
想法?
推荐答案
您应该能够使用foreach
输出接收器: https://spark.apache. org/docs/latest/structured-streaming-programming-guide.html#using-foreach
You should be able to use a foreach
output sink: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks and https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach
即使客户端不可序列化,也不必在ForeachWriter
构造函数中打开它.只需将其保留为None/null,然后在open
方法中对其进行初始化,该方法在序列化后称为,但每个任务只能执行一次.
Even though the client is not serializable, you don't have to open it in your ForeachWriter
constructor. Just leave it None/null, and initialize it in the open
method, which is called after serialization, but only once per task.
以伪代码排序:
class HBaseForeachWriter extends ForeachWriter[MyType] {
var client: Option[HBaseClient] = None
def open(partitionId: Long, version: Long): Boolean = {
client = Some(... open a client ...)
}
def process(record: MyType) = {
client match {
case None => throw Exception("shouldn't happen")
case Some(cl) => {
... use cl to write record ...
}
}
}
def close(errorOrNull: Throwable): Unit = {
client.foreach(cl => cl.close())
}
}
这篇关于结构化流-消耗每条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!