Quarkus中是否有任何功能可以向Kafka发送消息 [英] Is there any function in Quarkus to send message to Kafka
问题描述
我是kafka和quarkus的新手,我想在用户请求处理后向kafka主题发送消息.
I'm new to kafka and quarkus, i want to send message to kafka topic when a user request has processed.
我已经看过quarkus-quickstart中提供的kafka示例.我已经尝试过KafkaMessage
I have gone through kafka example provided in quarkus-quickstart. I have tried with KafkaMessage
// when GET called send message to topic
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
generateSingle();
return "hello";
}
@Outgoing("single-stations")
public KafkaMessage<Integer, String> generateSingle() {
return KafkaMessage.of(1, "value");
};
但是我得到的结果是,连续向Kafka主题发送消息.
But i got a result that sending Message to kafka topic continously.
我想知道我的代码还有其他方法吗?
I want to know is there any other method or is there any problem with my code.
感谢帮助
推荐答案
关于该主题的文档目前并不完整(Quarkus 0.25.0).我设法做到了,但是它做了很多实验,我相信这是一种hack,有望在以后的Quarkus版本中得到解决.
The documentation on this topic is terse and incomplete at this time (Quarkus 0.25.0). I managed to do it, but it took a lot of experimentation and something I believe is a hack that hopefully will be remedied in later versions of Quarkus.
原理是 @Outgoing
方法必须产生一个流,该流在外部中受到控制.这是通过在 @PostConstruct
方法中通过 Flowable.create()
创建流,并将发射器暴露给类成员来实现的. @Outgoing
方法仅返回已构造的流.
The principle is that the @Outgoing
method must produce a stream that is controlled externally. This is accomplished by creating the stream through Flowable.create()
in a @PostConstruct
method, and exposing the emitter to a class member. The @Outgoing
method simply returns the already constructed stream.
以下组件公开了一个公共方法 produce(String message)
,该方法会将文本消息发送给Kafka:
The following component exposes one public method, produce(String message)
that will send that text message to Kafka:
package ...
import java.util.UUID;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaMessage;
@ApplicationScoped
public class KafkaController {
private FlowableEmitter<KafkaMessage<String, String>> emitter;
private Flowable<KafkaMessage<String, String>> outgoingStream;
@PostConstruct
void init() {
outgoingStream = Flowable.create(emitter -> this.emitter = emitter, BackpressureStrategy.BUFFER);
}
public void produce(String message) {
emitter.onNext(KafkaMessage.of(UUID.randomUUID().toString(), message));
}
@PreDestroy
void dispose() {
emitter.onComplete();
}
@Outgoing("internal")
Publisher<KafkaMessage<String, String>> produceKafkaMessage() {
return outgoingStream;
}
@Incoming("internal")
@Outgoing("kafka-test")
KafkaMessage<String, String> transform(Message<KafkaMessage<String, String>> arg) {
return arg.getPayload();
}
}
我在生成的Quarkus应用程序中创建了此类,如记录在此处:
I created this class in a generated Quarkus application, as documented here:
mvn io.quarkus:quarkus-maven-plugin:0.25.0:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=kafka-quickstart \
-Dextensions="kafka"
并如下配置( application.properties
):
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.kafka-test.connector=smallrye-kafka
mp.messaging.outgoing.kafka-test.topic=test
mp.messaging.outgoing.kafka-test.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.kafka-test.value.serializer=org.apache.kafka.common.serialization.StringSerializer
完全按照 quickstart 中的描述启动Kafka实例.您可以使用控制台侦听器观看 test
主题,如下所示:
A Kafka instance is started exactly as described in the quickstart. You can watch the test
topic with a console listener as follows:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic test --from-beginning --group test-console.consumer
要对其进行测试,可以创建一个JAX-RS资源来调用 produce()
:
To test it, you can create a JAX-RS resource to invoke produce()
:
package ...
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@Path("/control")
public class KafkaProduceControlResource {
@Inject
KafkaController kafkaController;
@POST
@Path("/produce")
public void produceMessage(String message) {
kafkaController.produce(message);
}
}
按如下所示从命令行调用它,并观察控制台使用者:
Invoke it from command line as follows and watch the console consumer:
curl -i -s -X POST -d "A text message" \
http://localhost:8080/control/produce
黑客:似乎用 @Outgoing("kafka-test")
注释 produceKafkaMessage()
失败,因为Quarkus不会了解 KafkaMessage
是 Message ,并将其包装为一个,导致序列化错误.我绕过"internal"
流.
THE HACK: It seems that annotating produceKafkaMessage()
with @Outgoing("kafka-test")
fails, because Quarkus does NOT understand that a KafkaMessage
is a Message
, and is wrapping it in one, resulting in serialization errors. I am bypassing this with the "internal"
stream.
这篇关于Quarkus中是否有任何功能可以向Kafka发送消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!