如何为结构化查询获取 Kafka 偏移量以进行手动和可靠的偏移量管理? [英] How to get Kafka offsets for structured query for manual and reliable offset management?

查看:39
本文介绍了如何为结构化查询获取 Kafka 偏移量以进行手动和可靠的偏移量管理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark 2.2 引入了 Kafka 的结构化流媒体源.据我了解,它依赖于 HDFS 检查点目录来存储偏移量并保证恰好一次"消息传递.

Spark 2.2 introduced a Kafka's structured streaming source. As I understand, it's relying on HDFS checkpoint directory to store offsets and guarantee an "exactly-once" message delivery.

但是旧码头(例如 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) 说 Spark Streaming 检查点不是可跨应用程序或 Spark 升级恢复,因此不太可靠.作为解决方案,有一种做法是支持在支持 MySQL 或 RedshiftDB 等事务的外部存储中存储偏移量.

But old docks (like https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) says that Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable. As a solution, there is a practice to support storing offsets in external storage that supports transactions like MySQL or RedshiftDB.

如果我想将偏移量从 Kafka 源存储到事务数据库,我如何从结构化流批处理中获取偏移量?

If I want to store offsets from Kafka source to a transactional DB, how can I obtain offset from a structured stream batch?

以前,可以通过将 RDD 转换为 HasOffsetRanges 来完成:

Previously, it can be done by casting RDD to HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    

但是使用新的 Streaming API,我有一个 InternalRowDataset,我找不到获取偏移量的简单方法.Sink API 只有 addBatch(batchId: Long, data: DataFrame) 方法,我如何假设获得给定批次 ID 的偏移量?

But with new Streaming API, I have an Dataset of InternalRow and I can't find an easy way to fetch offsets. The Sink API has only addBatch(batchId: Long, data: DataFrame) method and how can I suppose to get an offset for given batch id?

推荐答案

相关 Spark DEV 邮件列表讨论主题是 此处.

Relevant Spark DEV mailing list discussion thread is here.

总结:

Spark Streaming 将支持在未来版本 (> 2.2.0) 中获取偏移量.要遵循的 JIRA 票证 - https://issues-test.apache.org/jira/浏览/SPARK-18258

Spark Streaming will support getting offsets in future versions (> 2.2.0). JIRA ticket to follow - https://issues-test.apache.org/jira/browse/SPARK-18258

对于 Spark <= 2.2.0,您可以通过从 checkpoint 目录读取 json 来获取给定批次的偏移量(API 不稳定,所以要小心):

For Spark <= 2.2.0, you can get offsets for the given batch by reading a json from checkpoint directory (the API is not stable, so be cautious):

val checkpointRoot = // read 'checkpointLocation' from custom sink params
val checkpointDir = new Path(new Path(checkpointRoot), "offsets").toUri.toString
val offsetSeqLog = new OffsetSeqLog(sparkSession, checkpointDir)

val endOffset: Map[TopicPartition, Long] = offsetSeqLog.get(batchId).map { endOffset =>
  endOffset.offsets.filter(_.isDefined).map { str =>
    JsonUtilsWrapper.jsonToOffsets(str.get.json)
  }
}


/**
  * Hack to access private API
  * Put this class into org.apache.spark.sql.kafka010 package
  */
object JsonUtilsWrapper {
  def offsetsToJson(partitionOffsets: Map[TopicPartition, Long]): String = {
    JsonUtils.partitionOffsets(partitionOffsets)
  }

  def jsonToOffsets(str: String): Map[TopicPartition, Long] = {
    JsonUtils.partitionOffsets(str)
  }
}

这个 endOffset 将包含每个主题/分区的直到偏移量.获取起始偏移量是有问题的,因为您必须阅读提交"检查点目录.但通常情况下,您并不关心起始偏移量,因为存储结束偏移量足以可靠地重新启动 Spark 作业.

This endOffset will contain the until offset for each topic/partition. Getting the start offsets is problematic, cause you have to read the 'commit' checkpoint dir. But usually, you don't care about start offsets, because storing end offsets is enough for reliable Spark job re-start.

请注意,您还必须将处理过的批次 ID 存储在您的存储中.在某些情况下,Spark 可以使用相同的批次 ID 重新运行失败的批次,因此请确保使用最新处理的批次 ID(您应该从外部存储读取)初始化自定义接收器,并忽略任何 id <的批次.latestProcessedBatchId.顺便说一句,批次 ID 在查询中不是唯一的,因此您必须分别为每个查询存储批次 ID.

Please, note that you have to store the processed batch id in your storage as well. Spark can re-run failed batch with the same batch id in some cases, so make sure to initialize a Custom Sink with latest processed batch id (which you should read from external storage) and ignore any batch with id < latestProcessedBatchId. Btw, batch id is not unique across queries, so you have to store batch id for each query separately.

这篇关于如何为结构化查询获取 Kafka 偏移量以进行手动和可靠的偏移量管理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆