Quarkus中是否有任何功能可以向Kafka发送消息 [英] Is there any function in Quarkus to send message to Kafka

查看:134
本文介绍了Quarkus中是否有任何功能可以向Kafka发送消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是kafka和quarkus的新手,我想在用户请求处理后向kafka主题发送消息.

I'm new to kafka and quarkus, i want to send message to kafka topic when a user request has processed.

我已经看过quarkus-quickstart中提供的kafka示例.我已经尝试过KafkaMessage

I have gone through kafka example provided in quarkus-quickstart. I have tried with KafkaMessage

// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
    generateSingle();
    return "hello";
}

@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
    return KafkaMessage.of(1, "value");
};

但是我得到的结果是,连续向Kafka主题发送消息.

But i got a result that sending Message to kafka topic continously.

我想知道我的代码还有其他方法吗?

I want to know is there any other method or is there any problem with my code.

感谢帮助

推荐答案

关于该主题的文档目前并不完整(Quarkus 0.25.0).我设法做到了,但是它做了很多实验,我相信这是一种hack,有望在以后的Quarkus版本中得到解决.

The documentation on this topic is terse and incomplete at this time (Quarkus 0.25.0). I managed to do it, but it took a lot of experimentation and something I believe is a hack that hopefully will be remedied in later versions of Quarkus.

原理是 @Outgoing 方法必须产生一个,该流在外部中受到控制.这是通过在 @PostConstruct 方法中通过 Flowable.create()创建流,并将发射器暴露给类成员来实现的. @Outgoing 方法仅返回已构造的流.

The principle is that the @Outgoing method must produce a stream that is controlled externally. This is accomplished by creating the stream through Flowable.create() in a @PostConstruct method, and exposing the emitter to a class member. The @Outgoing method simply returns the already constructed stream.

以下组件公开了一个公共方法 produce(String message),该方法会将文本消息发送给Kafka:

The following component exposes one public method, produce(String message) that will send that text message to Kafka:

package ...

import java.util.UUID;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;

@ApplicationScoped
public class KafkaController {

    private FlowableEmitter<KafkaMessage<String, String>> emitter;

    private Flowable<KafkaMessage<String, String>> outgoingStream;

    @PostConstruct
    void init() {
        outgoingStream = Flowable.create(emitter -> this.emitter = emitter, BackpressureStrategy.BUFFER);
    }

    public void produce(String message) {
        emitter.onNext(KafkaMessage.of(UUID.randomUUID().toString(), message));
    }

    @PreDestroy
    void dispose() {
        emitter.onComplete();
    }

    @Outgoing("internal")
    Publisher<KafkaMessage<String, String>> produceKafkaMessage() {
        return outgoingStream;
    }

    @Incoming("internal")
    @Outgoing("kafka-test")
    KafkaMessage<String, String> transform(Message<KafkaMessage<String, String>> arg) {
        return arg.getPayload();
    }
}

我在生成的Quarkus应用程序中创建了此类,如记录在此处:

I created this class in a generated Quarkus application, as documented here:

mvn io.quarkus:quarkus-maven-plugin:0.25.0:create \
    -DprojectGroupId=org.acme \
    -DprojectArtifactId=kafka-quickstart \
    -Dextensions="kafka"

并如下配置( application.properties ):

kafka.bootstrap.servers=localhost:9092

mp.messaging.outgoing.kafka-test.connector=smallrye-kafka
mp.messaging.outgoing.kafka-test.topic=test
mp.messaging.outgoing.kafka-test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kafka-test.value.serializer=org.apache.kafka.common.serialization.StringSerializer

完全按照 quickstart 中的描述启动Kafka实例.您可以使用控制台侦听器观看 test 主题,如下所示:

A Kafka instance is started exactly as described in the quickstart. You can watch the test topic with a console listener as follows:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic test --from-beginning --group test-console.consumer

要对其进行测试,可以创建一个JAX-RS资源来调用 produce():

To test it, you can create a JAX-RS resource to invoke produce():

package ...

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

@Path("/control")
public class KafkaProduceControlResource {

    @Inject
    KafkaController kafkaController;

    @POST
    @Path("/produce")
    public void produceMessage(String message) {
        kafkaController.produce(message);
    }
}

按如下所示从命令行调用它,并观察控制台使用者:

Invoke it from command line as follows and watch the console consumer:

curl -i -s -X POST -d "A text message" \
    http://localhost:8080/control/produce

黑客:似乎用 @Outgoing("kafka-test")注释 produceKafkaMessage()失败,因为Quarkus不会了解 KafkaMessage Message ,并将其包装为一个,导致序列化错误.我绕过"internal" 流.

THE HACK: It seems that annotating produceKafkaMessage() with @Outgoing("kafka-test") fails, because Quarkus does NOT understand that a KafkaMessage is a Message, and is wrapping it in one, resulting in serialization errors. I am bypassing this with the "internal" stream.

这篇关于Quarkus中是否有任何功能可以向Kafka发送消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆