如何在kafka中同步多个日志? [英] How to make multiple logs sync in kafka?

查看:73
本文介绍了如何在kafka中同步多个日志?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有两种类型的日志,它们有一个共同的字段'uid',并且如果这两个包含uid的两个日志的日志都到达了(例如联接),我想输出该日志,那么Kafka是否有可能?

Suppose I have 2 types of logs, which have a common field 'uid', and I want to output the log if the log of both of these 2 logs containing the uid arrives, like a join, is it possible for Kafka ?

推荐答案

是的,绝对可以.查看Kafka Streams,特别是DSL API.像这样:

Yes, absolutely. Check out Kafka Streams, specifically the DSL API. It goes something like:

 StreamsBuilder builder = new StreamsBuilder();

 KStream<byte[], Foo> fooStream = builder.stream("foo");

 KStream<byte[], Bar> barStream = builder.stream("bar");

 fooStream.join(barStream,
                (foo, bar) -> {
                    foo.baz = bar.baz;
                    return foo;
                },
                JoinWindows.of(1000))
          .to("buzz");

这个简单的应用程序使用两个输入主题("foo"和"bar"),将它们连接起来并将它们写入主题"buzz".由于流是无限的,因此在加入两个流时,需要指定一个加入窗口(以上1000毫秒),这是相应流上两条消息之间的相对时间差,以使它们有资格加入.

This simple application consumes two input topics ("foo" and "bar"), joins them and writes them to topic "buzz". Since streams are infinite, when joining two streams you need to specify a join window (1000 milliseconds above), which is the relative time difference between two messages on the respective streams to make them eligible for joining.

这是一个更完整的示例:

Here is a more complete example: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java

以下是文档: https://docs .confluent.io/current/streams/developer-guide/dsl-api.html .您会发现可以执行多种不同的联接:

And here is the documentation: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html. You'll find there are many different kinds of joins you can perform:

  • https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KStream.html
  • https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KTable.html

重要的是要注意,尽管以上示例将确定性地同步流-如果您重置并重新处理拓扑,则每次都会得到相同的结果-并非Kafka Streams中的所有联接操作都是确定性的.从1.0.0版开始,大约一半是不确定的,可能取决于基础主题分区消耗的数据顺序.具体来说,内部KStream-KStream和所有KTable-KTable连接是确定性的.其他联接,例如所有KStream-KTable联接和左/外部KStream-KStream联接,都是不确定的,并取决于使用者消耗的数据顺序.如果将拓扑设计为可重新处理,请记住这一点.如果使用这些非确定性操作,则当拓扑实时运行时,事件到达的顺序将产生一个结果,但是如果重新处理拓扑,则可能会得到另一个结果.另请注意,像KStream#merge()这样的操作也不会产生确定性的结果.有关此问题的更多信息,请参见我为什么Kafka Streams拓扑无法正确重播/重新处理?,并且此邮件列表帖子

It is important to note that although the above example will deterministically synchronize streams—if you reset and reprocess the topology, you will get the same result each time—not all join operations in Kafka Streams are deterministic. As of version 1.0.0 and before, approximately half are not deterministic and may depend on the order of data consumed from the underlying topic-partitions. Specifically, inner KStream-KStream and all KTable-KTable joins are deterministic. Other joins, like all KStream-KTable joins and left/outer KStream-KStream joins are non-deterministic and depend on order of data consumed by consumers. Keep this in mind if you are designing your topology to be reprocessable. If you use these non-deterministic operations, when your topology is running live, the order of events as they arrive will produce one result, but if you are reprocessing your topology you may get another result. Note also operations like KStream#merge() do not produce deterministic results either. For more regarding this problem, see Why does my Kafka Streams topology does not replay/reprocess correctly? and this mailing list post

这篇关于如何在kafka中同步多个日志?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆