Kafkastreams.allMetadata() 方法返回空列表 [英] Kafka streams.allMetadata() method returns empty list

查看:39
本文介绍了Kafkastreams.allMetadata() 方法返回空列表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我正在尝试使用 Kafka 流进行交互式查询.我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上).我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和 Kafka.

我已经设置了这样的主题

kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-submit-topickafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-output-topic

阅读我围绕这个问题所做的

我已阅读此文档页面:

正在运行的流

这是我第一次运行生产者,然后是流,然后又是生产者(又一次运行).

查看 KTable 的结果是如何显示的,然后我启动了生产者并通过流获取的主题推送更多消息

但是当我查询我的 REST 端点以尝试使用 localhost:8080/instances 获取元数据时,我得到的只是一个空列表 []

我本希望上面的流代码中的这些行返回一些元数据,商店中显然有一些东西,那么为什么没有元数据

val SIZE =streams.allMetadata.size()val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()

这两个都返回 0,同时使用此代码遍历商店中的项目

import org.apache.kafka.streams.state.KeyValueIterator导入 org.apache.kafka.streams.state.QueryableStoreTypes导入 org.apache.kafka.streams.state.ReadOnlyKeyValueStoreval keyValueStore =streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)val 范围 = keyValueStore.allval HASNEXT = range.hasNext导入 org.apache.kafka.streams.KeyValue而(范围.hasNext){val next = range.nextSystem.out.println(String.format("key: %s | value: %s", next.key, next.value))}

从商店产生数据

我知道 REST api 工作正常,因为硬编码的测试路线工作正常

我做错了什么???

解决方案

所以我想通了,原来是因为缺少这个配置值

props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080")

一旦我添加了 Akka Htpp REST API http://localhost:8080/instance 开始工作.但后来我开始收到这个奇怪的异常

org.apache.kafka.streams.errors.InvalidStateStoreException:状态存储 my-key-value-store 可能已迁移到另一个实例.在 org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)在 org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)在 org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)

所以在阅读这里之后:http://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-实例

我决定我需要做的是执行一些重试逻辑,我这样做了:

重试

我从这里借来的:https://gist.github.com/Mortimerp9/5430595

package Utils导入 scala.concurrent._导入 scala.concurrent.duration._对象重试{/*** 指数回退重试*/defexponentialBackoff(r: Int): Duration = scala.math.pow(2, r).round * 500 毫秒def noIgnore(t: Throwable): Boolean = false/*** 重试可能失败的特定块** @param maxRetry 在放弃之前重试多少次* @param dead 重试多长时间后放弃;默认无* @param backoff 一个回退函数,它返回一个 Duration,然后重试.默认是 100 毫秒步长的指数退避* @param ignoreThrowable 如果您想停止重试特定异常* @param 阻止一段代码重试* @param ctx 执行块的执行上下文* @returns 一个最终的 Future 成功计算值或失败之一:* `TooManyRetriesException` 如果重试次数过多而没有捕获异常.如果你传递体面的参数可能是不可能的* `DeadlineExceededException` 如果在提供的截止日期之前重试没有成功* `TimeoutException` 如果你提供了一个截止日期并且块执行时间太长* `Throwable` 最后遇到的异常*/def retry[T](maxRetry: Int,截止日期:选项[截止日期] = 无,退避:(Int)=>持续时间 = 指数退避,ignoreThrowable: Throwable =>Boolean = noIgnore)(block: => T)(implicit ctx: ExecutionContext): Future[T] = {class TooManyRetriesException extends Exception("无一例外的重试次数太多")class DeadlineExceededException extends Exception("deadline exceded")val p = 承诺[T]def recursiveRetry(retryCnt: Int, exception: Option[Throwable])(f: () => T): Option[T] = {如果(maxRetry == retryCnt||截止日期.isDefined &&截止日期.get.isOverdue) {异常匹配{情况 Some(t) =>p 故障 tcase None 如果deadline.isDefined &&最后期限.get.isOverdue =>p 失败(新的 DeadlineExceededException)情况无=>p 失败(新的 TooManyRetriesException)}没有任何} 别的 {val 成功 = 尝试 {val rez = if (deadline.isDefined) {Await.result(future(f()),deadline.get.timeLeft)} 别的 {F()}一些(rez)} 抓住 {case t: Throwable if !ignoreThrowable(t) =>阻塞{val 间隔 = backoff(retryCnt).toMillisThread.sleep(间隔)}recursiveRetry(retryCnt + 1, Some(t))(f)情况 t: Throwable =>p 故障 t没有任何}成功匹配{case Some(v) =>p 成功 v一些(五)情况无=>没有任何}}}def doBlock() = 块未来 {recursiveRetry(0, None)(doBlock)}未来}}

我这样称呼

def printStoreMetaData(streams:KafkaStreams) : Unit = {导入 org.apache.kafka.streams.state.KeyValueIterator导入 org.apache.kafka.streams.state.QueryableStoreTypes导入 org.apache.kafka.streams.state.ReadOnlyKeyValueStoreval keyValueStoreTry = waitUntilStoreIsQueryable(StateStores.RANKINGS_BY_EMAIL_STORE,QueryableStoreTypes.keyValueStore[String,List[Ranking]](),流) 比赛 {案例成功(keyValueStore)=>{val SIZE =streams.allMetadata.size()val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()val 范围 = keyValueStore.allval HASNEXT = range.hasNext导入 org.apache.kafka.streams.KeyValue而(范围.hasNext){val next = range.nextSystem.out.println(String.format("key: %s | value: %s", next.key, next.value))}}case 失败(f) =>打印(f)}}

这样做之后,我的每一天都是快乐的.

So I am trying to get interactive queries working with Kafka streams. I have Zookeeper and Kafka running locally (on windows). Where I use the C:\temp as the storage folder, for both Zookeeper and Kafka.

I have setup the topic like this

kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-submit-topic
kafka-topics.bat --zookeeper localhost:2181 --create  --replication-factor 1 --partitions 1 --topic rating-output-topic

Reading I have Done around this Issue

I have read this documentation page : http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

I have also read the Java example here : https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java

And also read this similar post which initially sounded like the same issue as me : Cannot access KTable from a different app as StateStore

So that is my setup. So what's the issue?

So as I say I am trying to create my own app, which allows interactive queries using a custom Akka Http REST Api (RPC calls as recommended) to allow me to query my KTable. The actual stream processing seems to be happening as expected, and I am able to print the results of the KTable and they match what is produced on the topic.

So the Storage side of things seems to be working

The problem seems to arise when attempting to use the Streams.allMetadata() method, where it returns an empty list.

I am using

  • List item
  • Scala 2.12
  • SBT
  • Akka.Http 10.9 for the REST Api
  • Kafka 11.0

Producer code

Here is the code for my producer

package Processing.Ratings {

  import java.util.concurrent.TimeUnit

  import Entities.Ranking
  import Serialization.JSONSerde
  import Topics.RatingsTopics

  import scala.util.Random
  import org.apache.kafka.clients.producer.ProducerRecord
  import org.apache.kafka.clients.producer.KafkaProducer
  import org.apache.kafka.common.serialization.Serdes
  import Utils.Settings
  import org.apache.kafka.clients.producer.ProducerConfig

  object RatingsProducerApp extends App {

   run()

    private def run(): Unit = {

      val jSONSerde = new JSONSerde[Ranking]
      val random = new Random
      val producerProps = Settings.createBasicProducerProperties
      val rankingList = List(
        Ranking("jarden@here.com","sacha@here.com", 1.5f),
        Ranking("miro@here.com","mary@here.com", 1.5f),
        Ranking("anne@here.com","margeret@here.com", 3.5f),
        Ranking("frank@here.com","bert@here.com", 2.5f),
        Ranking("morgan@here.com","ruth@here.com", 1.5f))

      producerProps.put(ProducerConfig.ACKS_CONFIG, "all")

      System.out.println("Connecting to Kafka cluster via bootstrap servers " +
        s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")

      // send a random string from List event every 100 milliseconds
      val rankingProducer = new KafkaProducer[String, Array[Byte]](
        producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)

      //while (true) {
      for (i <- 0 to 10) {
        val ranking = rankingList(random.nextInt(rankingList.size))
        val rankingBytes = jSONSerde.serializer().serialize("", ranking)
        System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
        rankingProducer.send(new ProducerRecord[String, Array[Byte]](
          RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
        Thread.sleep(100)
      }

      Runtime.getRuntime.addShutdownHook(new Thread(() => {
        rankingProducer.close(10, TimeUnit.SECONDS)
      }))
    }
  }
}

Streams Code

Here is the streams code

def createRatingStreamsProperties() : Properties = {
  val props = createBasicStreamProperties
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ratings-application")
  props.put(StreamsConfig.CLIENT_ID_CONFIG, "ratings-application-client")
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  props
}

private def createBasicStreamProperties() : Properties = {
  val props = new Properties()
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers)
  props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
  // Records should be flushed every 10 seconds. This is less than the default
  // in order to keep this example interactive.
  props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
  // For illustrative purposes we disable record caches
  props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object])
  props
}

And the actual code

import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream._
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import Utils.Settings

package Processing.Ratings {

import Stores.StateStores
import org.apache.kafka.streams.state.HostInfo


class DummyRankingReducer extends Reducer[Ranking] {
  override def apply(value1: Ranking, value2: Ranking): Ranking = {
    value2
  }
}

class RankingByEmailInitializer extends Initializer[List[Ranking]] {
  override def apply(): List[Ranking] = List[Ranking]()
}

class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
  override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
    value :: aggregate
  }
}


object RatingStreamProcessingApp extends App {

  run()

  private def run() : Unit = {
    val stringSerde = Serdes.String
    val rankingSerde = new JSONSerde[Ranking]
    val listRankingSerde = new JSONSerde[List[Ranking]]
    val builder: KStreamBuilder = new KStreamBuilder
    val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)

    val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
      .aggregate(
        new RankingByEmailInitializer(),
        new RankingByEmailAggregator(),
        listRankingSerde,
        StateStores.RANKINGS_BY_EMAIL_STORE
      )

    rankingTable.toStream.print()

    val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
    val restEndpoint:HostInfo  = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
    System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
    System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")

    // Always (and unconditionally) clean local state prior to starting the processing topology.
    // We opt for this unconditional call here because this will make it easier for you to play around with the example
    // when resetting the application for doing a re-run (via the Application Reset Tool,
    // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
    //
    // The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which
    // will take time and will require reading all the state-relevant data from the Kafka cluster over the network.
    // Thus in a production scenario you typically do not want to clean up always as we do here but rather only when it
    // is truly needed, i.e., only under certain conditions (e.g., the presence of a command line flag for your app).
    // See `ApplicationResetExample.java` for a production-like example.
    //streams.cleanUp();
    streams.start()
    val restService = new RatingRestService(streams, restEndpoint)
    restService.start()


    //****************************************************************
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
    //****************************************************************


    val SIZE = streams.allMetadata.size()
    val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()

    import org.apache.kafka.streams.state.KeyValueIterator
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)

    val range = keyValueStore.all
    val HASNEXT = range.hasNext
    import org.apache.kafka.streams.KeyValue
    while (range.hasNext      ) {
      val next = range.next
      System.out.println(String.format("key: %s | value: %s", next.key, next.value))
    }

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      streams.close(10, TimeUnit.SECONDS)
      restService.stop
    }))

    //return unit
    ()
  }
}

}

Where I have this config

kafka {
    bootStrapServers = "localhost:9092"
    zooKeepers = "zookeeper:2181"
    schemaRegistryUrl = "http://localhost:8081"
    partition = 0,
    restApiDefaultHostName = "localhost",
    restApiDefaultPort = "8080"
}

REST Service stuff

Scala port of the example file : https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/MetadataService.java

package Processing.Ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import Entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._


/**
  * Looks up StreamsMetadata from KafkaStreams
  */
class MetadataService(val streams: KafkaStreams) {


   /**
    * Get the metadata for all of the instances of this Kafka Streams application
    *
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadata() : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application
    val metadata = streams.allMetadata
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Get the metadata for all instances of this Kafka Streams application that currently
    * has the provided store.
    *
    * @param store The store to locate
    * @return List of { @link HostStoreInfo}
    */
  def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {

    // Get metadata for all of the instances of this Kafka Streams application hosting the store
    val metadata = streams.allMetadataForStore(store)
    return mapInstancesToHostStoreInfo(metadata)
  }


  /**
    * Find the metadata for the instance of this Kafka Streams Application that has the given
    * store and would have the given key if it exists.
    *
    * @param store Store to find
    * @param key   The key to find
    * @return { @link HostStoreInfo}
    */
  def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
    // Get metadata for the instances of this Kafka Streams application hosting the store and
    // potentially the value for key
    val metadata = streams.metadataForKey(store, key, serializer)
    if (metadata == null)
      throw new NotFoundException(
        s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")

    return new HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
  }




  def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {

    metadatas.stream.map[HostStoreInfo](metadata =>
      HostStoreInfo(
        metadata.host(),
        metadata.port,
        metadata.stateStoreNames.asScala.toList))
      .collect(Collectors.toList())
      .asScala.toList
  }



}

And here is the REST Service (I have only attempted to get "instances" route working at the moment).

package Processing.Ratings

import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import Entities.AkkaHttpEntitiesJsonFormats._
import Entities._
import akka.http.scaladsl.marshalling.ToResponseMarshallable

import scala.concurrent.Future


object RestService {
  val DEFAULT_REST_ENDPOINT_HOSTNAME  = "localhost"
}


class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {

  val metadataService = new MetadataService(streams)
  var bindingFuture: Future[Http.ServerBinding] = null

  implicit val system = ActorSystem("rating-system")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher


  def start() : Unit = {
    val emailRegexPattern =  """\w+""".r


    val route =
      path("ratingByEmail" / emailRegexPattern) { email =>
        get {

          //TODO : This would come from Kafka store, either local or remote

          complete(ToResponseMarshallable.apply(List[Ranking](
            Ranking("fred@here.com", "sacha@there.com", 4.0f),
            Ranking("sam@here.com", "sacha@there.com", 2.0f)))
          )
        }
      } ~
      path("instances") {
        get {
          val x = metadataService.streamsMetadata
          complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
        }
      }


    bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
    println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")

    Runtime.getRuntime.addShutdownHook(new Thread(() => {
      bindingFuture
        .flatMap(_.unbind()) // trigger unbinding from the port
        .onComplete(_ => system.terminate()) // and shutdown when done
    }))
  }


  def stop() : Unit = {
    bindingFuture
      .flatMap(_.unbind()) // trigger unbinding from the port
      .onComplete(_ => system.terminate()) // and shutdown when done
  }

  def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
    hostStoreInfo.host.equals(hostInfo.host()) &&
      hostStoreInfo.port == hostInfo.port
  }
}

Here is proof that there is data in the store

producer running

streams running

This is me having run the producer 1st, then the streams, and then the producer again (another run).

See how the results from the KTable are being shown, then I started the producer and pushed some more messages through the topic which the streams picked up

But when I query my REST endpoint to try get the metadata using localhost:8080/instances, all I get it an empty list []

I would have expected these lines from the streams code above to return some metadata, there is clearly something in the store, so why no metadata

val SIZE = streams.allMetadata.size()
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()

Both of these return 0, whilst iterating through the items in the store using this code

import org.apache.kafka.streams.state.KeyValueIterator
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)

val range = keyValueStore.all
val HASNEXT = range.hasNext
import org.apache.kafka.streams.KeyValue
while (range.hasNext      ) {
  val next = range.next
  System.out.println(String.format("key: %s | value: %s", next.key, next.value))
}

Produces data from the store

I know the REST api is working ok, as the hardcoded test route is working fine

What am I doing wrong???

解决方案

So I figured this out, turns out is was due to this missing config value

props.put(StreamsConfig.APPLICATION_SERVER_CONFIG,  "localhost:8080")

Once I added that the Akka Htpp REST API http://localhost:8080/instance started to work. But then I started getting this weird exception

org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance.
    at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
    at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
    at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)

So after reading about this one here : http://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

I decided what I need to do was carry out some retry logic, which I did like this:

Retry

Which I borrowed from here : https://gist.github.com/Mortimerp9/5430595

package Utils

import scala.concurrent._
import scala.concurrent.duration._


object Retry {

  /**
    * exponential back off for retry
    */
  def exponentialBackoff(r: Int): Duration = scala.math.pow(2, r).round * 500 milliseconds

  def noIgnore(t: Throwable): Boolean = false

  /**
    * retry a particular block that can fail
    *
    * @param maxRetry  how many times to retry before to giveup
    * @param deadline   how long to retry before giving up; default None
    * @param backoff        a back-off function that returns a Duration after which to retry. default is an exponential backoff at 100 milliseconds steps
    * @param ignoreThrowable        if you want to stop retrying on a particular exception
    * @param block  a block of code to retry
    * @param ctx    an execution context where to execute the block
    * @returns  an eventual Future succeeded with the value computed or failed with one of:
    *   `TooManyRetriesException`   if there were too many retries without an exception being caught. Probably impossible if you pass decent parameters
    *   `DeadlineExceededException` if the retry didn't succeed before the provided deadline
    *   `TimeoutException`  if you provide a deadline and the block takes too long to execute
    *   `Throwable` the last encountered exception
    */
  def retry[T](maxRetry: Int,
               deadline: Option[Deadline] = None,
               backoff: (Int) => Duration = exponentialBackoff,
               ignoreThrowable: Throwable => Boolean = noIgnore)(block: => T)(implicit ctx: ExecutionContext): Future[T] = {

    class TooManyRetriesException extends Exception("too many retries without exception")
    class DeadlineExceededException extends Exception("deadline exceded")

    val p = Promise[T]

    def recursiveRetry(retryCnt: Int, exception: Option[Throwable])(f: () => T): Option[T] = {
      if (maxRetry == retryCnt
        || deadline.isDefined && deadline.get.isOverdue) {
        exception match {
          case Some(t) =>
            p failure t
          case None if deadline.isDefined && deadline.get.isOverdue =>
            p failure (new DeadlineExceededException)
          case None =>
            p failure (new TooManyRetriesException)
        }
        None
      } else {
        val success = try {
          val rez = if (deadline.isDefined) {
            Await.result(future(f()), deadline.get.timeLeft)
          } else {
            f()
          }
          Some(rez)
        } catch {
          case t: Throwable if !ignoreThrowable(t) =>
            blocking {
              val interval = backoff(retryCnt).toMillis
              Thread.sleep(interval)
            }
            recursiveRetry(retryCnt + 1, Some(t))(f)
          case t: Throwable =>
            p failure t
            None
        }
        success match {
          case Some(v) =>
            p success v
            Some(v)
          case None => None
        }
      }
    }

    def doBlock() = block

    Future {
      recursiveRetry(0, None)(doBlock)
    }

    p.future
  }

}

Which I call like this

def printStoreMetaData(streams:KafkaStreams) : Unit = {

    import org.apache.kafka.streams.state.KeyValueIterator
    import org.apache.kafka.streams.state.QueryableStoreTypes
    import org.apache.kafka.streams.state.ReadOnlyKeyValueStore

    val keyValueStoreTry = waitUntilStoreIsQueryable(
      StateStores.RANKINGS_BY_EMAIL_STORE,
      QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
      streams
    ) match {
      case Success(keyValueStore) => {
        val SIZE = streams.allMetadata.size()
        val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
        val range = keyValueStore.all
        val HASNEXT = range.hasNext
        import org.apache.kafka.streams.KeyValue
        while (range.hasNext      ) {
          val next = range.next
          System.out.println(String.format("key: %s | value: %s", next.key, next.value))
        }
      }
      case Failure(f) => println(f)
    }

}

After doing that its all happy days for me.

这篇关于Kafkastreams.allMetadata() 方法返回空列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆