如何在Spring Cloud Stream中配置函数的绑定以将其输入绑定到Web终结点并将其输出绑定到Kafka主题 [英] How to configure the bindings of a function in Spring Cloud Stream to bind its input to a web endpoint and its output to a Kafka topic
问题描述
我有一个普通的java Function
;我试图绑定的东西:
I have a regular java Function
; which I am trying to bind:
- 其输入到Web端点
- 其输出为kafka主题.
当我在Web上下文中使用函数时,它总是将Function
的结果值单独返回给Web客户端.我可以做这样的事情吗?:
When I use my function in the context of the web, it always returns the resulting value of the Function
back to the web client alone. Can I do something like this?:
spring.cloud.stream.bindings.input.binder=web
spring.cloud.stream.bindings.output.binder=kafka
我目前甚至试图将Function
拆分为2:
I'm currently even trying to split the Function
into 2:
- 其中一个输入绑定到Web客户端,其输出动态绑定到第二个功能(使用
spring.cloud.stream.sendto.destination
) - 另一个函数,其输出绑定到kafka绑定.
仍然这种方法也不起作用.动态路由(spring.cloud.stream.sendto.destination
)重新显示在Web客户端上.但不会将Message
发送到kafka绑定本身.这是我在第二种方法(2个函数)中使用的代码,希望简单地获得一个Spring功能应用程序以将其输入绑定到Web端点并输出到kafka主题.
Still this approach doesn't work either. The dynamic routing (spring.cloud.stream.sendto.destination
) shows up back on the web client; but no Message
is sent to the kafka binding itself. Here is the code I am using in this second approach (2 functions) in the hopes to simply get a Spring functional app to bind its input to a web endpoint and output to a kafka topic.
WebToKafkaApp.java
@SpringBootApplication
public class WebToKafkaApp {
public static void main(String[] args) {
SpringApplication.run(WebToKafkaApp.class, args);
}
@Bean
public Function<String, Message<String>> webFunction() {
return payload -> createPayloadMapperToMessage("kafkaFunction").apply(payload);
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> kafkaFunction() {
return flux -> flux.map(msg -> createPayloadMapperToMessage("").apply(msg.getPayload()));
}
private Function<String, Message<String>> createPayloadMapperToMessage(String destination) {
return payload -> MessageBuilder
.withPayload(payload.toUpperCase())
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
}
}
application.yml
spring.cloud.stream.bindings.webFunction-in-0:
destination: webFunctionIN
contentType: application/json
spring.cloud.stream.bindings.webFunction-out-0:
destination: webFunctionOUT
contentType: application/json
spring.cloud.stream.bindings.kafkaFunction-in-0:
destination: kafkaFunctionIN
contentType: application/json
binder: kafka
spring.cloud.stream.bindings.kafkaFunction-out-0:
destination: kafkaFunctionOUT
contentType: application/json
binder: kafka
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
spring.cloud.stream.function.routing.enabled: true
spring.cloud.function.definition: webFunction
build.gradle
plugins {
id 'org.springframework.boot' version '2.2.1.RELEASE'
id 'io.spring.dependency-management' version '1.0.8.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "Hoxton.RELEASE")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.cloud:spring-cloud-starter-function-web'
implementation 'org.springframework.cloud:spring-cloud-starter-function-webflux'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}
任何帮助将不胜感激.
推荐答案
感谢 Oleg 发布此解决方案背后的想法.从本质上讲,我增强了他的建议,以概括地处理以下两者之间的桥梁:
Thanks to Oleg for posting the idea behind this solution. Essentially, I enhanced his proposal to generically handle a bridge between:
- 功能完善的网络控制器;可以接收网络请求.
- 流供应商;可以将任何消息转发到消息传递基础结构.
This solution encapsulates the concerns described in Oleg example, inside a custom implementation of a Supplier
. Such implementation exposes an API to trigger the Supplier
to emit a message passed as parameter. Such a class would look like the following:
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.function.Supplier;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
public class StreamSupplier implements Supplier<Flux<?>> {
private static final String SPRING_CLOUD_STREAM_SENDTO_DESTINATION =
"spring.cloud.stream.sendto.destination";
public static <T> Message<?> createMessage(T payload, String destination) {
MessageBuilder<T> builder = MessageBuilder.withPayload(payload);
if (destination != null && !destination.isEmpty())
builder.setHeader(SPRING_CLOUD_STREAM_SENDTO_DESTINATION, destination);
return builder.build();
}
private String defaultDestination;
private EmitterProcessor<? super Object> processor = EmitterProcessor.create();
public StreamSupplier() {
this(null);
}
public StreamSupplier(String defaultDestination) {
this.defaultDestination = defaultDestination;
}
// SEND APIs
public <T> Message<?> sendMessage(T payload) {
return sendMessage(payload, defaultDestination);
}
public <T> Message<?> sendMessage(T payload, String destination) {
return sendBody(createMessage(payload, destination));
}
public <T> T sendBody(T body) {
processor.onNext(body);
return body;
}
/**
* Returns {@link EmitterProcessor} used internally to programmatically publish messages onto
* the output binding associated with this {@link Supplier}. Such programmatic publications
* are available through the {@code sendXXX} API methods available in this class.
*/
@Override
public Flux<?> get() {
return processor;
}
}
然后,开发人员只需:
- 将此特定
Supplier
实现的实例注册为Spring
应用程序中的bean
;然后让spring-cloud-function
将此bean
扫描到FunctionCatalog
中. - 创建网络功能使用先前注册的
Supplier
将任何消息转发到流式基础结构-可以使用spring-cloud-stream
的所有提示进行配置.
- Register an instance of this particular
Supplier
implementation as abean
in aSpring
application; and letspring-cloud-function
scan thisbean
into theFunctionCatalog
. - Create a web function that forwards any message to a streaming infrastructure using the previously registered
Supplier
- which can be configured using all the bells and whistles ofspring-cloud-stream
.
以下示例对此进行了说明:
The following example demonstrate this:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Controller;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.publisher.Flux;
@SpringBootApplication
@Controller
public class MyApp {
public static void main(String[] args) {
SpringApplication.run(MyApp.class,
"--spring.cloud.function.definition=streamSupplierFunction;webToStreamFunction");
}
// Functional Web Controller
@Bean
public Function<String, String> webToStreamFunction() {
return msg -> streamSupplier().sendBody(msg);
}
// Functional Stream Supplier
@Bean
public Supplier<Flux<?>> streamSupplierFunction() {
return new StreamSupplier();
}
// DOUBLE REGISTRATION TO AVOID POLLABLE CONFIGURATION
// LIMITATION OF SPRING-CLOUD-FUNCTION
@Bean
public StreamSupplier streamSupplier() {
return (StreamSupplier) streamSupplierFunction();
}
}
再次,我要感谢 Oleg 提供的
Again, I want to thanks Oleg for providing the required details I was looking for to build this comprehensive solution.
这篇关于如何在Spring Cloud Stream中配置函数的绑定以将其输入绑定到Web终结点并将其输出绑定到Kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!