从 Kafka 获取消息,发送到 Rsocket 并从 React 客户端接收它 [英] Get Message From Kafka, send to Rsocket and Receive it from React client

查看:31
本文介绍了从 Kafka 获取消息,发送到 Rsocket 并从 React 客户端接收它的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spring 云流将数据从 kafka 发送到 Rsocket,然后在 React 上表示数据

I am trying to send data from kafka using Spring cloud stream to Rsocket and then represent data on React

这是我的配置.

@Configuration
public class RsocketConsumerConfiguration {
    
    @Bean
    public Sinks.Many<Data> sender(){
        return Sinks.many().multicast().directBestEffort();
    }
    

}

@控制器公共类 ServerController {

@Controller public class ServerController {

@Autowired
private Sinks.Many<Data> integer;

@MessageMapping("integer")
public Flux<Data> integer() {
    return  integer.asFlux();
}

@EnableBinding(IClientProcessor.class)
public class Listener {

    @Autowired
    private Sinks.Many<Data> integer;

    @StreamListener(IClientProcessor.INTEGER)
    public void integer(Data val) {
        System.out.println(val);
        integer.tryEmitNext(val);
    }

}

   let  client = new RSocketClient({
    transport: new RSocketWebSocketClient(
        {
            url: 'ws://localhost:7000/ws',
            wsCreator: (url) => new WebSocket(url),
            debug: true,
        },
        BufferEncoders,
    ),
    setup: {
        dataMimeType: "application/json",
        metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
        keepAlive: 5000,
        lifetime: 60000,
    },
});

  client
            .then(rsocket => {
                console.log("Connected to rsocket");
                rsocket.requestStream({
                    metadata: Buffer.from(encodeCompositeMetadata([
                        [MESSAGE_RSOCKET_ROUTING, encodeRoute("integer")],
                    ])),
                 
                })
                    .subscribe({
                        onSubscribe: s => {
                            s.request(2147483647)
                        },
                        onNext: (p) => {
                            let newData = {
                                time: new Date(JSON.parse(p.data).time).getUTCSeconds(),
                                integer: JSON.parse(p.data).integer
                            }
                           newData.integer >100?setInteger(currentData => [newData, ...currentData]):setInt(currentData => [newData, ...currentData])
                           console.log(newData)
                        },
                        onError: (e) => console.error(e),
                        onComplete: () => console.log("Done")
                    });

spring.cloud.stream.bindings.integer.destination=整数无法在反应应用程序中看到它.请指教.我做错了什么?

spring.cloud.stream.bindings.integer.destination=integer Not able to see it in react app. Please advise. What I am doing wrong?

推荐答案

鉴于数据似乎直接从 Kafka(通过 Spring)传输到客户端,也许 通过互联网消息代理将 Kafka 消息通过 WebSockets 流式传输到面向互联网的客户端.

Given the data appears to be going directly from Kafka (via Spring) to the client, perhaps it would make more sense to stream Kafka messages via an internet-messaging broker to Internet-facing clients over WebSockets.

披露:我不是那篇文章的作者,但在作者工作的那家公司工作.我们经常看到这个用例,所以希望这种方法可能有用.

Disclosure: I am not the author of that article, but do work at that company where the author works. We see this use case frequently, so expect this approach may be useful.

这篇关于从 Kafka 获取消息,发送到 Rsocket 并从 React 客户端接收它的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆