Vert.x反应式Kafka客户端:编写时链接不起作用? [英] Vert.x Reactive Kafka client: chaining not working when writing?

查看:456
本文介绍了Vert.x反应式Kafka客户端:编写时链接不起作用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用io.vertx.reactivex.kafka.client.producer.KafkaProducer客户端.客户有一个 返回Single<RecordMetadata>rxWrite函数.但是,我需要在写操作期间记录错误(如果有).显然它没有被执行.

I am using io.vertx.reactivex.kafka.client.producer.KafkaProducer client. The client has a rxWrite function which returns Single<RecordMetadata>. However I need to log error if any, during write operation. It apparently is not getting executed.

我写了以下工作示例.

test():用于测试链接和日志记录的功能

test(): Function to test the chaining and logging

    fun test(): Single<Int> {
    val data = Single.just(ArrayList<String>().apply {
        add("Hello")
        add("World")
    })

    data.flattenAsObservable<String> { list -> list }
        .flatMap { advertiser ->
       //does not work with writeKafka
            writeError(advertiser).toObservable().doOnError({ println("Error $data") })
        }
        .subscribe({ record -> println(record) }, { e -> println("Error2 $e") })

    return data.map { it.size }
}

writeKafka::将给定的字符串写入Kafka并返回Single

writeKafka: Writes the given given string into Kafka and returns Single

fun writeKafka(param: String): Single<RecordMetadata> {
 //null topic to produce IllegalArgumentException()
    val record = KafkaProducerRecord.create(null, UUID.randomUUID().toString(), param)
    return kafkaProducer.rxWrite(record)
}

writeError:始终返回一个错误且类型相同的错误

writeError: Always return a single with error of same type

fun writeError(param: String): Single<RecordMetadata> {
    return Single.error<RecordMetadata>(IllegalArgumentException())
}

因此,当我呼叫writeKafka时,它只会打印Error2,但是如果我使用writeError,它会同时打印ErrorError2.看来writeKafka返回的单曲仍在等待结果,但是为什么还要打印Error2?

So when I call writeKafka It only prints Error2 but if I use writeError it prints both Error and Error2. Looks like the single returned by writeKafka is still waiting for result, but then why even Error2 is printed?

我是RxJava2的新手,有人可以指出其中的任何错误吗?

I am pretty newbie in RxJava2, could somebody point out any error in that?

推荐答案

读取并发布错误的堆栈跟踪很重要,这样可以隔离问题.

It is important to read and post the stacktrace of errors so that the problem can be isolated.

在这种情况下,您似乎从create获得了IllegalArgumentException,却没有得到任何Single,因为相关的Kafka类

In this case, looks like you get the IllegalArgumentException from create and you don't get any Single because the relevant Kafka class throws it. return kafkaProducer.rxWrite(record) never executes at all and you practically crash the flatMap. doOnError never gets into play hence only the "Error2" is printed.

这篇关于Vert.x反应式Kafka客户端:编写时链接不起作用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆