Vert.x反应式Kafka客户端:编写时链接不起作用? [英] Vert.x Reactive Kafka client: chaining not working when writing?
问题描述
我正在使用io.vertx.reactivex.kafka.client.producer.KafkaProducer
客户端.客户有一个
返回Single<RecordMetadata>
的rxWrite
函数.但是,我需要在写操作期间记录错误(如果有).显然它没有被执行.
I am using io.vertx.reactivex.kafka.client.producer.KafkaProducer
client. The client has a
rxWrite
function which returns Single<RecordMetadata>
. However I need to log error if any, during write operation. It apparently is not getting executed.
我写了以下工作示例.
test():用于测试链接和日志记录的功能
test(): Function to test the chaining and logging
fun test(): Single<Int> {
val data = Single.just(ArrayList<String>().apply {
add("Hello")
add("World")
})
data.flattenAsObservable<String> { list -> list }
.flatMap { advertiser ->
//does not work with writeKafka
writeError(advertiser).toObservable().doOnError({ println("Error $data") })
}
.subscribe({ record -> println(record) }, { e -> println("Error2 $e") })
return data.map { it.size }
}
writeKafka::将给定的字符串写入Kafka并返回Single
writeKafka: Writes the given given string into Kafka and returns Single
fun writeKafka(param: String): Single<RecordMetadata> {
//null topic to produce IllegalArgumentException()
val record = KafkaProducerRecord.create(null, UUID.randomUUID().toString(), param)
return kafkaProducer.rxWrite(record)
}
writeError:始终返回一个错误且类型相同的错误
writeError: Always return a single with error of same type
fun writeError(param: String): Single<RecordMetadata> {
return Single.error<RecordMetadata>(IllegalArgumentException())
}
因此,当我呼叫writeKafka
时,它只会打印Error2
,但是如果我使用writeError
,它会同时打印Error
和Error2
.看来writeKafka
返回的单曲仍在等待结果,但是为什么还要打印Error2
?
So when I call writeKafka
It only prints Error2
but if I use writeError
it prints both Error
and Error2
. Looks like the single returned by writeKafka
is still waiting for result, but then why even Error2
is printed?
我是RxJava2的新手,有人可以指出其中的任何错误吗?
I am pretty newbie in RxJava2, could somebody point out any error in that?
推荐答案
读取并发布错误的堆栈跟踪很重要,这样可以隔离问题.
It is important to read and post the stacktrace of errors so that the problem can be isolated.
在这种情况下,您似乎从create
获得了IllegalArgumentException
,却没有得到任何Single
,因为相关的Kafka类
In this case, looks like you get the IllegalArgumentException
from create
and you don't get any Single
because the relevant Kafka class throws it. return kafkaProducer.rxWrite(record)
never executes at all and you practically crash the flatMap
. doOnError
never gets into play hence only the "Error2" is printed.
这篇关于Vert.x反应式Kafka客户端:编写时链接不起作用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!