Kafka stream.allMetadata()方法返回空列表 [英] Kafka streams.allMetadata() method returns empty list
问题描述
因此,我正在尝试使用Kafka流进行交互式查询.我有在本地(在Windows上)运行的Zookeeper和Kafka.我将C:\ temp用作Zookeeper和Kafka的存储文件夹.
So I am trying to get interactive queries working with Kafka streams. I have Zookeeper and Kafka running locally (on windows). Where I use the C:\temp as the storage folder, for both Zookeeper and Kafka.
我已经设置了这样的主题
I have setup the topic like this
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-submit-topic
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-output-topic
我已阅读解决此问题的内容
I have read this documentation page : http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
I have also read the Java example here : https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample.java
And also read this similar post which initially sounded like the same issue as me : Cannot access KTable from a different app as StateStore
这就是我的设置.那是什么问题?
因此,正如我所说的,我正在尝试创建自己的应用程序,该应用程序允许使用自定义Akka Http REST Api(建议使用RPC调用)进行交互式查询,以允许我查询我的KTable
.实际的流处理似乎正在按预期进行,并且我能够打印KTable
的结果,并且它们与该主题上产生的结果匹配.
So as I say I am trying to create my own app, which allows interactive queries using a custom Akka Http REST Api (RPC calls as recommended) to allow me to query my KTable
. The actual stream processing seems to be happening as expected, and I am able to print the results of the KTable
and they match what is produced on the topic.
因此,存储方面的工作似乎很正常
So the Storage side of things seems to be working
尝试使用Streams.allMetadata()
方法时会出现问题,它会返回一个空列表.
The problem seems to arise when attempting to use the Streams.allMetadata()
method, where it returns an empty list.
我正在使用
- 列表项
- Scala 2.12
- SBT
- 用于REST Api的Akka.Http 10.9
- Kafka 11.0
生产者代码
这是我的制作人的代码
package Processing.Ratings {
import java.util.concurrent.TimeUnit
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import scala.util.Random
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.serialization.Serdes
import Utils.Settings
import org.apache.kafka.clients.producer.ProducerConfig
object RatingsProducerApp extends App {
run()
private def run(): Unit = {
val jSONSerde = new JSONSerde[Ranking]
val random = new Random
val producerProps = Settings.createBasicProducerProperties
val rankingList = List(
Ranking("jarden@here.com","sacha@here.com", 1.5f),
Ranking("miro@here.com","mary@here.com", 1.5f),
Ranking("anne@here.com","margeret@here.com", 3.5f),
Ranking("frank@here.com","bert@here.com", 2.5f),
Ranking("morgan@here.com","ruth@here.com", 1.5f))
producerProps.put(ProducerConfig.ACKS_CONFIG, "all")
System.out.println("Connecting to Kafka cluster via bootstrap servers " +
s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")
// send a random string from List event every 100 milliseconds
val rankingProducer = new KafkaProducer[String, Array[Byte]](
producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)
//while (true) {
for (i <- 0 to 10) {
val ranking = rankingList(random.nextInt(rankingList.size))
val rankingBytes = jSONSerde.serializer().serialize("", ranking)
System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
rankingProducer.send(new ProducerRecord[String, Array[Byte]](
RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
Thread.sleep(100)
}
Runtime.getRuntime.addShutdownHook(new Thread(() => {
rankingProducer.close(10, TimeUnit.SECONDS)
}))
}
}
}
流代码
这是流代码
def createRatingStreamsProperties() : Properties = {
val props = createBasicStreamProperties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ratings-application")
props.put(StreamsConfig.CLIENT_ID_CONFIG, "ratings-application-client")
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
props
}
private def createBasicStreamProperties() : Properties = {
val props = new Properties()
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers)
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object])
// For illustrative purposes we disable record caches
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object])
props
}
以及实际代码
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream._
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import Utils.Settings
package Processing.Ratings {
import Stores.StateStores
import org.apache.kafka.streams.state.HostInfo
class DummyRankingReducer extends Reducer[Ranking] {
override def apply(value1: Ranking, value2: Ranking): Ranking = {
value2
}
}
class RankingByEmailInitializer extends Initializer[List[Ranking]] {
override def apply(): List[Ranking] = List[Ranking]()
}
class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
value :: aggregate
}
}
object RatingStreamProcessingApp extends App {
run()
private def run() : Unit = {
val stringSerde = Serdes.String
val rankingSerde = new JSONSerde[Ranking]
val listRankingSerde = new JSONSerde[List[Ranking]]
val builder: KStreamBuilder = new KStreamBuilder
val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)
val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
.aggregate(
new RankingByEmailInitializer(),
new RankingByEmailAggregator(),
listRankingSerde,
StateStores.RANKINGS_BY_EMAIL_STORE
)
rankingTable.toStream.print()
val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
val restEndpoint:HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")
// Always (and unconditionally) clean local state prior to starting the processing topology.
// We opt for this unconditional call here because this will make it easier for you to play around with the example
// when resetting the application for doing a re-run (via the Application Reset Tool,
// http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool).
//
// The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which
// will take time and will require reading all the state-relevant data from the Kafka cluster over the network.
// Thus in a production scenario you typically do not want to clean up always as we do here but rather only when it
// is truly needed, i.e., only under certain conditions (e.g., the presence of a command line flag for your app).
// See `ApplicationResetExample.java` for a production-like example.
//streams.cleanUp();
streams.start()
val restService = new RatingRestService(streams, restEndpoint)
restService.start()
//****************************************************************
// WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
// WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
// WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
// WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE
//****************************************************************
val SIZE = streams.allMetadata.size()
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
import org.apache.kafka.streams.state.KeyValueIterator
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)
val range = keyValueStore.all
val HASNEXT = range.hasNext
import org.apache.kafka.streams.KeyValue
while (range.hasNext ) {
val next = range.next
System.out.println(String.format("key: %s | value: %s", next.key, next.value))
}
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
restService.stop
}))
//return unit
()
}
}
}
我有此配置的地方
kafka {
bootStrapServers = "localhost:9092"
zooKeepers = "zookeeper:2181"
schemaRegistryUrl = "http://localhost:8081"
partition = 0,
restApiDefaultHostName = "localhost",
restApiDefaultPort = "8080"
}
REST服务内容
Scala port of the example file : https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/MetadataService.java
package Processing.Ratings
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import Entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._
/**
* Looks up StreamsMetadata from KafkaStreams
*/
class MetadataService(val streams: KafkaStreams) {
/**
* Get the metadata for all of the instances of this Kafka Streams application
*
* @return List of { @link HostStoreInfo}
*/
def streamsMetadata() : List[HostStoreInfo] = {
// Get metadata for all of the instances of this Kafka Streams application
val metadata = streams.allMetadata
return mapInstancesToHostStoreInfo(metadata)
}
/**
* Get the metadata for all instances of this Kafka Streams application that currently
* has the provided store.
*
* @param store The store to locate
* @return List of { @link HostStoreInfo}
*/
def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {
// Get metadata for all of the instances of this Kafka Streams application hosting the store
val metadata = streams.allMetadataForStore(store)
return mapInstancesToHostStoreInfo(metadata)
}
/**
* Find the metadata for the instance of this Kafka Streams Application that has the given
* store and would have the given key if it exists.
*
* @param store Store to find
* @param key The key to find
* @return { @link HostStoreInfo}
*/
def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
// Get metadata for the instances of this Kafka Streams application hosting the store and
// potentially the value for key
val metadata = streams.metadataForKey(store, key, serializer)
if (metadata == null)
throw new NotFoundException(
s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")
return new HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
}
def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {
metadatas.stream.map[HostStoreInfo](metadata =>
HostStoreInfo(
metadata.host(),
metadata.port,
metadata.stateStoreNames.asScala.toList))
.collect(Collectors.toList())
.asScala.toList
}
}
这是REST服务(目前我仅试图使实例"路由起作用).
And here is the REST Service (I have only attempted to get "instances" route working at the moment).
package Processing.Ratings
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import Entities.AkkaHttpEntitiesJsonFormats._
import Entities._
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import scala.concurrent.Future
object RestService {
val DEFAULT_REST_ENDPOINT_HOSTNAME = "localhost"
}
class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {
val metadataService = new MetadataService(streams)
var bindingFuture: Future[Http.ServerBinding] = null
implicit val system = ActorSystem("rating-system")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
def start() : Unit = {
val emailRegexPattern = """\w+""".r
val route =
path("ratingByEmail" / emailRegexPattern) { email =>
get {
//TODO : This would come from Kafka store, either local or remote
complete(ToResponseMarshallable.apply(List[Ranking](
Ranking("fred@here.com", "sacha@there.com", 4.0f),
Ranking("sam@here.com", "sacha@there.com", 2.0f)))
)
}
} ~
path("instances") {
get {
val x = metadataService.streamsMetadata
complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
}
}
bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")
Runtime.getRuntime.addShutdownHook(new Thread(() => {
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}))
}
def stop() : Unit = {
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
hostStoreInfo.host.equals(hostInfo.host()) &&
hostStoreInfo.port == hostInfo.port
}
}
这里证明商店中有数据
生产者正在运行
正在运行的流
这是我先运行生产者,然后运行流,然后再次运行生产者(另一次运行).
This is me having run the producer 1st, then the streams, and then the producer again (another run).
看看如何显示KTable
的结果,然后我启动了生成器,并通过流中拾取的主题推送了更多消息
See how the results from the KTable
are being shown, then I started the producer and pushed some more messages through the topic which the streams picked up
但是当我查询我的REST端点以尝试使用localhost:8080/instances
获取元数据时,我得到的都是一个空列表[]
But when I query my REST endpoint to try get the metadata using localhost:8080/instances
, all I get it an empty list []
我希望上面的流代码中的这些行能够返回一些元数据,显然商店中有东西,所以为什么没有元数据
I would have expected these lines from the streams code above to return some metadata, there is clearly something in the store, so why no metadata
val SIZE = streams.allMetadata.size()
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
这两个都返回0,同时使用此代码遍历商店中的商品
Both of these return 0, whilst iterating through the items in the store using this code
import org.apache.kafka.streams.state.KeyValueIterator
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore)
val range = keyValueStore.all
val HASNEXT = range.hasNext
import org.apache.kafka.streams.KeyValue
while (range.hasNext ) {
val next = range.next
System.out.println(String.format("key: %s | value: %s", next.key, next.value))
}
从商店生产数据
我知道REST api可以正常工作,因为硬编码的测试路由可以正常工作
I know the REST api is working ok, as the hardcoded test route is working fine
我在做什么错??
推荐答案
所以我弄清楚了,原来是由于缺少此配置值
So I figured this out, turns out is was due to this missing config value
props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080")
一旦我添加了Akka Htpp REST API http://localhost:8080/instance
开始起作用.但是后来我开始收到这个奇怪的异常
Once I added that the Akka Htpp REST API http://localhost:8080/instance
started to work. But then I started getting this weird exception
org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance.
at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49)
at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55)
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699)
So after reading about this one here : http://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance
我认为我需要做的是执行一些重试逻辑,就像这样:
I decided what I need to do was carry out some retry logic, which I did like this:
重试
我从这里借来的: https://gist.github.com/Mortimerp9/5430595
package Utils
import scala.concurrent._
import scala.concurrent.duration._
object Retry {
/**
* exponential back off for retry
*/
def exponentialBackoff(r: Int): Duration = scala.math.pow(2, r).round * 500 milliseconds
def noIgnore(t: Throwable): Boolean = false
/**
* retry a particular block that can fail
*
* @param maxRetry how many times to retry before to giveup
* @param deadline how long to retry before giving up; default None
* @param backoff a back-off function that returns a Duration after which to retry. default is an exponential backoff at 100 milliseconds steps
* @param ignoreThrowable if you want to stop retrying on a particular exception
* @param block a block of code to retry
* @param ctx an execution context where to execute the block
* @returns an eventual Future succeeded with the value computed or failed with one of:
* `TooManyRetriesException` if there were too many retries without an exception being caught. Probably impossible if you pass decent parameters
* `DeadlineExceededException` if the retry didn't succeed before the provided deadline
* `TimeoutException` if you provide a deadline and the block takes too long to execute
* `Throwable` the last encountered exception
*/
def retry[T](maxRetry: Int,
deadline: Option[Deadline] = None,
backoff: (Int) => Duration = exponentialBackoff,
ignoreThrowable: Throwable => Boolean = noIgnore)(block: => T)(implicit ctx: ExecutionContext): Future[T] = {
class TooManyRetriesException extends Exception("too many retries without exception")
class DeadlineExceededException extends Exception("deadline exceded")
val p = Promise[T]
def recursiveRetry(retryCnt: Int, exception: Option[Throwable])(f: () => T): Option[T] = {
if (maxRetry == retryCnt
|| deadline.isDefined && deadline.get.isOverdue) {
exception match {
case Some(t) =>
p failure t
case None if deadline.isDefined && deadline.get.isOverdue =>
p failure (new DeadlineExceededException)
case None =>
p failure (new TooManyRetriesException)
}
None
} else {
val success = try {
val rez = if (deadline.isDefined) {
Await.result(future(f()), deadline.get.timeLeft)
} else {
f()
}
Some(rez)
} catch {
case t: Throwable if !ignoreThrowable(t) =>
blocking {
val interval = backoff(retryCnt).toMillis
Thread.sleep(interval)
}
recursiveRetry(retryCnt + 1, Some(t))(f)
case t: Throwable =>
p failure t
None
}
success match {
case Some(v) =>
p success v
Some(v)
case None => None
}
}
}
def doBlock() = block
Future {
recursiveRetry(0, None)(doBlock)
}
p.future
}
}
我这样打给
def printStoreMetaData(streams:KafkaStreams) : Unit = {
import org.apache.kafka.streams.state.KeyValueIterator
import org.apache.kafka.streams.state.QueryableStoreTypes
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore
val keyValueStoreTry = waitUntilStoreIsQueryable(
StateStores.RANKINGS_BY_EMAIL_STORE,
QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
streams
) match {
case Success(keyValueStore) => {
val SIZE = streams.allMetadata.size()
val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
val range = keyValueStore.all
val HASNEXT = range.hasNext
import org.apache.kafka.streams.KeyValue
while (range.hasNext ) {
val next = range.next
System.out.println(String.format("key: %s | value: %s", next.key, next.value))
}
}
case Failure(f) => println(f)
}
}
做完这一切对我来说都是快乐的日子.
After doing that its all happy days for me.
这篇关于Kafka stream.allMetadata()方法返回空列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!