结构化流-消耗每条消息 [英] Structured Streaming - Consume each message

查看:69
本文介绍了结构化流-消耗每条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

处理通过结构化流水线传输的每条消息(火花"在2.1.1上,源为Kafka 0.10.2.1)的推荐"方式是什么?

What would be the "recommended" way to process each message as it comes through Structured streaming pipeline (i m on spark 2.1.1 with source being Kafka 0.10.2.1) ?

到目前为止,我正在查看dataframe.mapPartitions(因为我需要连接到HBase,其客户端连接类不可序列化,因此是mapPartitions).

So far, I am looking at dataframe.mapPartitions (since i need to connect to HBase, whose client connection classes are not serizalable, hence mapPartitions).

想法?

推荐答案

您应该能够使用foreach输出接收器: https://spark.apache. org/docs/latest/structured-streaming-programming-guide.html#using-foreach

You should be able to use a foreach output sink: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks and https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

即使客户端不可序列化,也不必在ForeachWriter构造函数中打开它.只需将其保留为None/null,然后在open方法中对其进行初始化,该方法在序列化后称为,但每个任务只能执行一次.

Even though the client is not serializable, you don't have to open it in your ForeachWriter constructor. Just leave it None/null, and initialize it in the open method, which is called after serialization, but only once per task.

以伪代码排序:

class HBaseForeachWriter extends ForeachWriter[MyType] {
  var client: Option[HBaseClient] = None
  def open(partitionId: Long, version: Long): Boolean = {
    client = Some(... open a client ...)
  }
  def process(record: MyType) = {
    client match {
      case None => throw Exception("shouldn't happen")
      case Some(cl) => {
        ... use cl to write record ...
      }
    }
  }
  def close(errorOrNull: Throwable): Unit = {
    client.foreach(cl => cl.close())
  }
}

这篇关于结构化流-消耗每条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆