打印Kafka Stream输入到控制台? [英] Print Kafka Stream Input out to console?

查看:364
本文介绍了打印Kafka Stream输入到控制台?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在查看我正在处理的Java应用程序的很多Kafka文档。我已经尝试过使用Java 8中引入的lambda语法,但是我在这方面有点粗略,并且不会觉得它应该是我现在使用的那样。

I've been looking through a lot of the Kafka documentation for a java application that I am working on. I've tried getting into the lambda syntax introduced in Java 8, but I am a little sketchy on that ground and don't feel too confident that it should be what I use as of yet.

我有一个Kafka / Zookeeper服务运行没有任何麻烦,我想要做的是编写一个小的示例程序,基于输入将写出来,但不做一个wordcount,因为有这样的许多已经存在的例子。

I've a Kafka/Zookeeper Service running without any troubles, and what I want to do is write a small example program that based on the input will write it out, but not do a wordcount as there are so many examples of already.

对于样本数据,我将获得以下结构的字符串:

As for sample data I will be getting a string of following structure:

This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.



问题



我希望能够提取3个字母的关键字并使用 System.out.println 打印它们。如何获取包含输入的字符串变量?我知道如何应用正则表达式,甚至只是搜索字符串来获取关键字。

Question

I want to be able to extract the 3 letter keywords and print them with a System.out.println. How do I get a string variable containing the input? I know how to apply regular expressions or even just searching through the string to get the keywords.

public static void main(String[] args) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
    props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

    final Serde<String> stringSerde = Serdes.String();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");

    KafkaStreams streams = new KafkaStreams(builder, props);
    streams.start();

    //How do I assign the input from in-stream to the following variable?
    String variable = ?
}

我有动物园管理员,卡夫卡,生产商和消费者运行所有连接到同一个主题所以我想基本上看到相同的 String 出现在所有实例(生产者,消费者和流)上。

I have zookeeper, kafka, producer and consumer running all hooked up to the same topic so I want to basically see the same String appear on all of the instances (producer, consumer and stream).

推荐答案

如果您使用Kafka Streams,则需要在数据流上应用函数/运算符。在您的情况下,您创建一个 KStream 对象,因此,您希望将运算符应用于 source

If you use Kafka Streams, you need to apply functions/operators on your data streams. In your case, you create a KStream object, thus, you want to apply an operator to source.

根据您的想法,有些运算符会独立地为流中的每条记录应用一个函数(例如 map()),或将函数应用于多个记录的其他运算符(例如 aggregateByKey())。您应该查看文档: http: //docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl 和示例 https://github.com/confluentinc/examples/tree/kafka-0.10.0.0-cp-3.0.0/kafka-流

Depending on what you want to do, there are operators that apply a function to each record in the stream independently (eg. map()), or other operators that apply a function to multiple record together (eg. aggregateByKey()). You should have a look into the documentation: http://docs.confluent.io/3.0.0/streams/developer-guide.html#kafka-streams-dsl and examples https://github.com/confluentinc/examples/tree/kafka-0.10.0.0-cp-3.0.0/kafka-streams

因此,您从未使用Kafka Streams创建局部变量,如上图所示,而是将所有内容嵌入到链接在一起的运算符/函数中。

Thus, you never create local variables using Kafka Streams as you show in your example above, but rather embed everything in operators/functions that get chained together.

例如,如果你想将所有输入记录打印到stdout,你可以这样做

For example, if you want to print all input record to stdout, you could do

KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
    void apply(String key, String value) {
        System.out.println(key + ": " + value);
    }
 });

因此,在您通过 streams.start()启动应用程序后,它将消费您输入主题的记录,并且对于您主题的每条记录,完成对 apply(...)的调用,打印stdout上的记录。

Thus, after you start your application via streams.start(), it will consumer the records from you input topic and for each record of your topic, a call to apply(...) is done, which prints the record on stdout.

当然,将流打印到控制台的更原生的方法是使用 source.print()(内部与显示的 foreach()运算符基本相同,已经给出了 ForeachAction 。)

Of course, a more native way for printing the stream to the console would be to use source.print() (which internally is basically the same as the shown foreach() operator with an already given ForeachAction.)

对于将字符串分配给局部变量的示例,您需要将代码放入 apply(...) 并执行你的正则表达式等等,以提取3个字母的关键字。

For your example with assigning the string to a local variable, you would need to put your code into apply(...) and do your regex-stuff etc. there to "extract the 3 letter keywords".

表达此信息的最佳方法是通过 flatMapValues() print()的组合(即 source.flatMapValues( ...)。打印())。 为每个输入记录调用flatMapValues()(在您的情况下,我假设键将是 null ,因此您可以忽略它)。在 flatMapValue 函数中,您应用正则表达式,并且对于每个匹配,您将匹配添加到最终返回的值列表中。

The best way to express this, would however be via a combination of flatMapValues() and print() (ie, source.flatMapValues(...).print()). flatMapValues() is called for each input record (in your case, I assume key will be null so you can ignore it). Within your flatMapValue function, you apply your regex and for each match, you add the match to a list of values that you finally return.

source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
    @Override
    public Iterable<String> apply(String value) {
        ArrayList<String> keywords = new ArrayList<String>();

        // apply regex to value and for each match add it to keywords

        return keywords;
    }
}

<$ c的输出$ c> flatMapValues 将再次为 KStream ,其中包含每个找到的关键字的记录(即,输出流是所有关联的联合列出你的回报 ValueMapper#apply())。最后,你只需通过 print()将结果打印到控制台。
(当然,您也可以使用单个 foreach 而不是 flatMapValue + 打印但这不会模块化。)

The output of flatMapValues will be a KStream again, containing a record for each found keyword (ie, the output stream is a "union" over all lists your return in ValueMapper#apply()). Finally, you just print your result to console via print(). (Of course, you could also use a single foreach instead of flatMapValue+print but this would be less modular.)

这篇关于打印Kafka Stream输入到控制台?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆