将 Kafka Stream Input 打印到控制台? [英] Print Kafka Stream Input out to console?

查看:43
本文介绍了将 Kafka Stream Input 打印到控制台?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在查看我正在开发的 Java 应用程序的大量 Kafka 文档.我曾尝试了解 Java 8 中引入的 lambda 语法,但我对此有点粗略,并且对它应该是我目前使用的内容并不太自信.

I've been looking through a lot of the Kafka documentation for a java application that I am working on. I've tried getting into the lambda syntax introduced in Java 8, but I am a little sketchy on that ground and don't feel too confident that it should be what I use as of yet.

我有一个 Kafka/Zookeeper Service 运行没有任何问题,我想做的是编写一个小示例程序,根据输入将它写出来,但不进行字数统计,因为有很多示例已经.

I've a Kafka/Zookeeper Service running without any troubles, and what I want to do is write a small example program that based on the input will write it out, but not do a wordcount as there are so many examples of already.

至于示例数据,我将获得以下结构的字符串:

As for sample data I will be getting a string of following structure:

This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.

问题

我希望能够提取 3 个字母的关键字并使用 System.out.println 打印它们.如何获取包含输入的字符串变量?我知道如何应用正则表达式,甚至只是搜索字符串来获取关键字.

Question

I want to be able to extract the 3 letter keywords and print them with a System.out.println. How do I get a string variable containing the input? I know how to apply regular expressions or even just searching through the string to get the keywords.

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    final Serde<String> stringSerde = Serdes.String();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    //How do I assign the input from in-stream to the following variable?
    String variable = ?
}

我有运行的zookeeper、kafka、生产者和消费者都连接到同一个主题,所以我希望基本上看到相同的String出现在所有实例(生产者、消费者和流)上.

I have zookeeper, kafka, producer and consumer running all hooked up to the same topic so I want to basically see the same String appear on all of the instances (producer, consumer and stream).

推荐答案

如果您使用 Kafka Streams,则需要在数据流上应用函数/运算符.在您的情况下,您创建了一个 KStream 对象,因此,您希望将运算符应用于 source.

If you use Kafka Streams, you need to apply functions/operators on your data streams. In your case, you create a KStream object, thus, you want to apply an operator to source.

根据您要执行的操作,有些运算符将函数独立应用于流中的每条记录(例如 map()),或者其他运算符将函数应用于多个记录在一起(例如 aggregateByKey()).您应该查看文档:http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl 和示例 https://github.com/confluentinc/kafka-streams-examples

Depending on what you want to do, there are operators that apply a function to each record in the stream independently (eg. map()), or other operators that apply a function to multiple record together (eg. aggregateByKey()). You should have a look into the documentation: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl and examples https://github.com/confluentinc/kafka-streams-examples

因此,您永远不会像上面的示例中展示的那样使用 Kafka Streams 创建局部变量,而是将所有内容嵌入到链接在一起的运算符/函数中.

Thus, you never create local variables using Kafka Streams as you show in your example above, but rather embed everything in operators/functions that get chained together.

例如,如果要将所有输入记录打印到标准输出,则可以这样做

For example, if you want to print all input record to stdout, you could do

KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
    void apply(String key, String value) {
        System.out.println(key + ": " + value);
    }
 });

因此,在您通过 streams.start() 启动您的应用程序后,它将使用您输入主题的记录,并且对于您主题的每条记录,都会调用 apply(...) 完成,在标准输出上打印记录.

Thus, after you start your application via streams.start(), it will consumer the records from you input topic and for each record of your topic, a call to apply(...) is done, which prints the record on stdout.

当然,将流打印到控制台的更本地方式是使用 source.print()(其内部与所示的 foreach() 操作符和一个已经给定的 ForeachAction.)

Of course, a more native way for printing the stream to the console would be to use source.print() (which internally is basically the same as the shown foreach() operator with an already given ForeachAction.)

对于将字符串分配给局部变量的示例,您需要将代码放入 apply(...) 并在那里执行正则表达式等以提取 3字母关键词".

For your example with assigning the string to a local variable, you would need to put your code into apply(...) and do your regex-stuff etc. there to "extract the 3 letter keywords".

然而,表达这一点的最佳方式是通过 flatMapValues()print() 的组合(即,source.flatMapValues(...).print()).flatMapValues() 为每条输入记录调用(在你的情况下,我假设键是 null 所以你可以忽略它).在您的 flatMapValue 函数中,您应用正则表达式,对于每个匹配项,将匹配项添加到您最终返回的值列表中.

The best way to express this, would however be via a combination of flatMapValues() and print() (ie, source.flatMapValues(...).print()). flatMapValues() is called for each input record (in your case, I assume key will be null so you can ignore it). Within your flatMapValue function, you apply your regex and for each match, you add the match to a list of values that you finally return.

source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        ArrayList<String> keywords = new ArrayList<String>();

        // apply regex to value and for each match add it to keywords

        return keywords;
    }
}

flatMapValues 的输出将再次成为 KStream,包含每个找到的关键字的记录(即,输出流是您返回的所有列表的联合"在 ValueMapper#apply() 中).最后,您只需通过 print() 将结果打印到控制台.(当然,您也可以使用单个 foreach 而不是 flatMapValue+print ,但这会减少模块化.)

The output of flatMapValues will be a KStream again, containing a record for each found keyword (ie, the output stream is a "union" over all lists your return in ValueMapper#apply()). Finally, you just print your result to console via print(). (Of course, you could also use a single foreach instead of flatMapValue+print but this would be less modular.)

这篇关于将 Kafka Stream Input 打印到控制台?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆