使用kafka-streams有条件地对json输入流进行排序 [英] using kafka-streams to conditionally sort a json input stream

查看:249
本文介绍了使用kafka-streams有条件地对json输入流进行排序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是开发kafka-streams应用程序的新手.我的流处理器用于根据输入的json消息中的用户键值对json消息进行排序.

I am new to developing kafka-streams applications. My stream processor is meant to sort json messages based on a value of a user key in the input json message.

Message 1: {"UserID": "1", "Score":"123", "meta":"qwert"}
Message 2: {"UserID": "5", "Score":"780", "meta":"mnbvs"}
Message 3: {"UserID": "2", "Score":"0", "meta":"fghjk"}

我在这里已阅读动态连接Kafka输入流到多个输出流,就没有动态解决方案.

I have read here Dynamically connecting a Kafka input stream to multiple output streams that there is no dynamic solution.

在用例中,我知道排序输入流所需的用户键和输出主题.因此,我正在针对每个用户编写单独的处理器应用程序,其中每个处理器应用程序都与一个不同的UserID匹配.

In my use-case I know the user keys and output topics that I need to sort the input stream. So I am writing separate processor applications specific to each user where each processor application matches a different UserID.

所有不同的流处理器应用程序都从kafka中的同一个json输入主题中读取,但是如果满足预设的用户条件,每个应用程序只会将消息写入特定用户的输出主题.

All the different stream processor applications read from the same json input topic in kafka but each one only writes the message to a output topic for a specific user if the preset user condition is met.

public class SwitchStream extends AbstractProcessor<String, String> {
        @Override
        public void process(String key, String value) {
            HashMap<String, String> message = new HashMap<>();
            ObjectMapper mapper = new ObjectMapper();
            try {
                message = mapper.readValue(value, HashMap.class);
            } catch (IOException e){}

            // User condition UserID = 1
            if(message.get("UserID").equals("1")) {
                context().forward(key, value);
                context().commit();
            }
        }

        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sort-stream-processor");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            TopologyBuilder builder = new TopologyBuilder();
            builder.addSource("Source", "INPUT_TOPIC");
            builder.addProcessor("Process", SwitchStream::new, "Source");
            builder.addSink("Sink", "OUTPUT_TOPIC", "Process");

            KafkaStreams streams = new KafkaStreams(builder, props);
            streams.start(); 
       }
}

问题1: 如果使用低级处理器API,是否可以使用高级流DSL轻松实现相同的功能? (我承认我发现很难理解并遵循High-Level Streams DSL的其他在线示例)

Question 1: Is it possible to achieve the same functionality easily using the High-Level Streams DSL instead if the Low-Level Processor API? (I admit I found it harder understand and follow the other online examples of the High-Level Streams DSL)

问题2: 输入json主题以20K-25K EPS的高速率获取输入.我的处理器应用程序似乎无法跟上该输入流的步伐.我曾尝试为每个进程部署多个实例,但结果远不及我希望的那样.理想情况下,每个处理器实例应能够处理3-5K的EPS.

Question 2: The input json topic is getting input at a high rate 20K-25K EPS. My processor applications don't seem to be able to keep pace with this input stream. I have tried deploying multiple instances of each process but the results are nowhere close to where I want them to be. Ideally each processor instance should be able to process 3-5K EPS.

是否有一种方法可以使用高级流DSL来改善我的处理器逻辑或编写相同的处理器逻辑?会有所作为吗?

Is there a way to improve my processor logic or write the same processor logic using the high level streams DSL? would that make a difference?

推荐答案

您可以通过filter()在高级DSL中执行此操作(您有效地实现了过滤器,因为只有在userID==1时才返回消息).您可以使用KStream#branch()来概括此过滤器模式(有关更多详细信息,请参阅文档: http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations ).另请阅读JavaDocs: http://kafka.apache .org/0102/javadoc/index.html?org/apache/kafka/streams

You can do this in high-level DSL via filter() (you effectively implemented a filter as you only return a message if it's userID==1). You could generalize this filter pattern, by using KStream#branch() (see the docs for further details: http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations). Also read the JavaDocs: http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/streams

KStreamBuilder builder = new KStreamBuilder();
builder.stream("INPUT_TOPIC")
       .filter(new Predicate() {
           @Overwrite
           boolean test(String key, String value) {
               // put you processor logic here
               return message.get("UserID").equals("1")
           }
        })
       .to("OUTPUT_TOPIC");

关于性能.一个实例应该能够处理10K +条记录.没有任何进一步的信息,很难说出问题可能出在哪里.我建议在Kafka用户列表中询问(请参阅 http://kafka.apache.org/contact )

About performance. A single instance should be able to process 10K+ records. It's hard to tell without any further information what the problem might be. I would recommend to ask at Kafka user list (see http://kafka.apache.org/contact)

这篇关于使用kafka-streams有条件地对json输入流进行排序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆