阅读来自Kafka Streams的Peek主题 [英] Reading peek topic from kafka streams

查看:88
本文介绍了阅读来自Kafka Streams的Peek主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个主题名称,它是push-processing-KSTREAM-PEEK-0000000014-repartition,这是kafka的内部主题.我没有创建此主题,重新分区后我正在使用.peek()方法,并且使用了3-4次peek方法.

I have a topic name which is push-processing-KSTREAM-PEEK-0000000014-repartition and this is internal topic by kafka. I did not create this topic and I am using .peek() method after repartition and using peek method 3-4 times.

我的问题是我可以从主题topic read push-processing-KSTREAM-PEEK-0000000014-repartition中阅读,但是我说topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning时看不懂.

My question is I can read from topic topic read push-processing-KSTREAM-PEEK-0000000014-repartition, but I can not read when I say topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning.

创建此内部主题是因为使用peek方法吧?

This internal topic is created because of peek method right?

它是否与其他分区流代码相关,但名称为KSTREEAM-PEEK?

Or is it related with other repartition streams code, but its name is KSTREEAM-PEEK?

它有50个分区.由于peek是无状态操作,因此它不应该创建内部主题,但是为什么它的名称与peek相关,为什么我不能从头开始阅读?

It has 50 partitions. Because of peek is stateless operation, it should not create internal topics right but why is it name is related with peek and why I can not read from beginning?

任何想法请/

这是第一个拓扑:

   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [appconnect_deviceIds_exported_for_push])
      --> KSTREAM-FLATMAP-0000000004
    Processor: KSTREAM-FLATMAP-0000000004 (stores: [])
      --> KSTREAM-PEEK-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-PEEK-0000000005 (stores: [])
      --> KSTREAM-FILTER-0000000007
      <-- KSTREAM-FLATMAP-0000000004
    Processor: KSTREAM-FILTER-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000006
      <-- KSTREAM-PEEK-0000000005
    Sink: KSTREAM-SINK-0000000006 (topic: KSTREAM-PEEK-0000000005-repartition)
      <-- KSTREAM-FILTER-0000000007

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000008 (topics: [KSTREAM-PEEK-0000000005-repartition])
      --> KSTREAM-JOIN-0000000009
    Source: KSTREAM-SOURCE-0000000028 (topics: [KSTREAM-PEEK-0000000025-repartition])
      --> KSTREAM-JOIN-0000000029
    Processor: KSTREAM-JOIN-0000000009 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> KSTREAM-MAP-0000000010
      <-- KSTREAM-SOURCE-0000000008
    Processor: KSTREAM-JOIN-0000000029 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> KSTREAM-PEEK-0000000030
      <-- KSTREAM-SOURCE-0000000028
    Processor: KSTREAM-MAP-0000000010 (stores: [])
      --> KSTREAM-PEEK-0000000011
      <-- KSTREAM-JOIN-0000000009
    Processor: KSTREAM-PEEK-0000000030 (stores: [])
      --> KSTREAM-MAP-0000000031
      <-- KSTREAM-JOIN-0000000029
    Processor: KSTREAM-MAP-0000000031 (stores: [])
      --> KSTREAM-SINK-0000000032
      <-- KSTREAM-PEEK-0000000030
    Processor: KSTREAM-PEEK-0000000011 (stores: [])
      --> KSTREAM-SINK-0000000012
      <-- KSTREAM-MAP-0000000010
    Source: KSTREAM-SOURCE-0000000002 (topics: [appconnect_device_stream])
      --> KTABLE-SOURCE-0000000003
    Sink: KSTREAM-SINK-0000000012 (topic: appconnect_devices_exported_for_push)
      <-- KSTREAM-PEEK-0000000011
    Sink: KSTREAM-SINK-0000000032 (topic: appconnect_devices_exported_for_push)
      <-- KSTREAM-MAP-0000000031
    Processor: KTABLE-SOURCE-0000000003 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
      --> none
      <-- KSTREAM-SOURCE-0000000002

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000013 (topics: [appconnect_userIds_exported_for_push])
      --> KSTREAM-FLATMAP-0000000017
    Processor: KSTREAM-FLATMAP-0000000017 (stores: [])
      --> KSTREAM-PEEK-0000000018
      <-- KSTREAM-SOURCE-0000000013
    Processor: KSTREAM-PEEK-0000000018 (stores: [])
      --> KSTREAM-FILTER-0000000020
      <-- KSTREAM-FLATMAP-0000000017
    Processor: KSTREAM-FILTER-0000000020 (stores: [])
      --> KSTREAM-SINK-0000000019
      <-- KSTREAM-PEEK-0000000018
    Sink: KSTREAM-SINK-0000000019 (topic: KSTREAM-PEEK-0000000018-repartition)
      <-- KSTREAM-FILTER-0000000020

  Sub-topology: 3
    Source: KSTREAM-SOURCE-0000000021 (topics: [KSTREAM-PEEK-0000000018-repartition])
      --> KSTREAM-JOIN-0000000022
    Processor: KSTREAM-JOIN-0000000022 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
      --> KSTREAM-PEEK-0000000023
      <-- KSTREAM-SOURCE-0000000021
    Processor: KSTREAM-PEEK-0000000023 (stores: [])
      --> KSTREAM-MAP-0000000024
      <-- KSTREAM-JOIN-0000000022
    Processor: KSTREAM-MAP-0000000024 (stores: [])
      --> KSTREAM-PEEK-0000000025
      <-- KSTREAM-PEEK-0000000023
    Processor: KSTREAM-PEEK-0000000025 (stores: [])
      --> KSTREAM-FILTER-0000000027
      <-- KSTREAM-MAP-0000000024
    Processor: KSTREAM-FILTER-0000000027 (stores: [])
      --> KSTREAM-SINK-0000000026
      <-- KSTREAM-PEEK-0000000025
    Source: KSTREAM-SOURCE-0000000015 (topics: [appconnect_user_stream])
      --> KTABLE-SOURCE-0000000016
    Sink: KSTREAM-SINK-0000000026 (topic: KSTREAM-PEEK-0000000025-repartition)
      <-- KSTREAM-FILTER-0000000027
    Processor: KTABLE-SOURCE-0000000016 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
      --> none
      <-- KSTREAM-SOURCE-0000000015

那是第二步,

   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-PEEK-0000000014-repartition])
      --> KSTREAM-JOIN-0000000018
    Processor: KSTREAM-JOIN-0000000018 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
      --> KSTREAM-FILTER-0000000019
      <-- KSTREAM-SOURCE-0000000017
    Processor: KSTREAM-FILTER-0000000019 (stores: [])
      --> KSTREAM-SINK-0000000020
      <-- KSTREAM-JOIN-0000000018
    Source: KSTREAM-SOURCE-0000000001 (topics: [appconnect_push_processing_submissions])
      --> KTABLE-SOURCE-0000000002
    Sink: KSTREAM-SINK-0000000020 (topic: appconnect_push_send_bulk)
      <-- KSTREAM-FILTER-0000000019
    Processor: KTABLE-SOURCE-0000000002 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
      --> none
      <-- KSTREAM-SOURCE-0000000001

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000003 (topics: [appconnect_devices_exported_for_push])
      --> KSTREAM-MAP-0000000007
    Processor: KSTREAM-MAP-0000000007 (stores: [])
      --> KSTREAM-PEEK-0000000008
      <-- KSTREAM-SOURCE-0000000003
    Processor: KSTREAM-PEEK-0000000008 (stores: [])
      --> KSTREAM-FILTER-0000000010
      <-- KSTREAM-MAP-0000000007
    Processor: KSTREAM-FILTER-0000000010 (stores: [])
      --> KSTREAM-SINK-0000000009
      <-- KSTREAM-PEEK-0000000008
    Sink: KSTREAM-SINK-0000000009 (topic: KSTREAM-PEEK-0000000008-repartition)
      <-- KSTREAM-FILTER-0000000010

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000011 (topics: [KSTREAM-PEEK-0000000008-repartition])
      --> KSTREAM-LEFTJOIN-0000000012
    Processor: KSTREAM-LEFTJOIN-0000000012 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
      --> KSTREAM-KEY-SELECT-0000000013
      <-- KSTREAM-SOURCE-0000000011
    Processor: KSTREAM-KEY-SELECT-0000000013 (stores: [])
      --> KSTREAM-PEEK-0000000014
      <-- KSTREAM-LEFTJOIN-0000000012
    Processor: KSTREAM-PEEK-0000000014 (stores: [])
      --> KSTREAM-FILTER-0000000016
      <-- KSTREAM-KEY-SELECT-0000000013
    Processor: KSTREAM-FILTER-0000000016 (stores: [])
      --> KSTREAM-SINK-0000000015
      <-- KSTREAM-PEEK-0000000014
    Source: KSTREAM-SOURCE-0000000005 (topics: [appconnect_user_stream])
      --> KTABLE-SOURCE-0000000006
    Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-PEEK-0000000014-repartition)
      <-- KSTREAM-FILTER-0000000016
    Processor: KTABLE-SOURCE-0000000006 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
      --> none
      <-- KSTREAM-SOURCE-0000000005

我所有这些操作都使用相同的KEY.对于所有主题,我都有5个经纪人和50个分区.我有10个并发性,我将应用程序缩放到5.但是就像我说的那样,我正在进行分区并在同一密钥上传输3-4次数据.这意味着我所有与flatMap,地图操作相关的值都移到同一分区.我使用不同的密钥1到2次,因此消息分配到不同的分区,只有1-2次.这会影响我的表现吗?或者,我绝对应该分配到不同的分区上以提高性能.

And all of my these operations use same KEY. I have 5 brokers and 50 partitions for all topics. I have 10 concurrency and I scaled my app to 5. But like I said I am doing repartition and transfer data 3-4 times on a same key. That means all my values related flatMap, map operations go to same partition. 1 or 2 times I am using different key so messages distributed to different partitions, just 1-2 times. Does this affect my performance? Or I should definitely distribute on different partitions to increase my performance.

因此,kafka在仅使用主题之间的一个分区执行3-4次连接或重新分区操作时,基本上表现出更好的性能,因为kafka仅会读取一个分区,并且实际上知道在何处读取和立即读取所有内容.数据,因为数据在磁盘上物理上平行(我指的是ssd或hdd).或者是我的第二种情况;我肯定应该使用更多的分区来并行读取分区之间的内容吗?

So basically is kafka showing better performance when performing join or repartition operation with 3-4 times using only a partition between the topics, because kafka will read from only and only a partition and actually knows where to read and read immediately all the data because the data on the physically parallel on the disk (I mean ssd or hdd). Or my second scenario; I should definitely use more partitions to read parallel between the partitions?

我还认为使用peek会减慢我的速度.

And I also think that using peek slows my process.

推荐答案

peek()操作无关.查看您发布的程序的拓扑描述(部分)如下:

The peek() operation is unrelated. Looking at the topology description you posted you program (partly) is as follows:

KStream inputUser = builder.stream().flatMap().peek().filter();
KStream inputDevice = builder.stream().flatMap().peek().filter();
inputUser.join(inputDevice,...)

(如果您也将代码发布到问题中,也会更容易).

(Would be easier if you would post your code in the question, too).

因为您调用flatMap(),Kafka Streams假定您更改了密钥,因此,调用join()会触发数据重新分区.分区主题名称是由上游操作员生成的(我不100%知道为什么选择PEEK而不是选择FILTER.)

Because you call flatMap() Kafka Streams assumes that you change the key, and hence, calling join() triggers the data repartitioning. The repartition topic name is generated by upstream operatore (I am not 100% sure why PEEK is picked instead of FILTER to be fair.)

所有这些操作都使用相同的KEY.

And all of my these operations use same KEY.

在这种情况下,您可能要使用flatMapValues()而不是flatMap().对于这种情况,Kafka Streams知道密钥没有更改,因此不会创建重新分区主题.

For this case, you might want to use flatMapValues() instead of flatMap(). For this case, Kafka Streams knows that the key did not change and thus it would not create a repartition topic.

类似地,如果密钥没有更改以避免不必要的重新分区,则可能要使用mapValues()而不是map().

Similarly, you might want to use mapValues() instead of map() if the key does not change to avoid unnecessary repartitioning.

我的问题是我可以从主题主题读取push-processing-KSTREAM-PEEK-0000000014分区"中读取内容,但是当我说主题读取push-processing-KSTREAM-PEEK-0000000014分区-从头开始"

My question is I can read from topic "topic read push-processing-KSTREAM-PEEK-0000000014-repartition" but I can not read when I say "topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning"

我不确定您的意思.什么

I am not sure what you mean by this. What does

当我说主题读取push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning"时

when I say "topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning"

是什么意思?您是否参考命令行工具bin/kafka-consumer.sh?通常,是的,您可以阅读分区主题,但是我不确定为什么这会有用吗?

mean? Do you refer to the command line tool bin/kafka-consumer.sh? In general, yes, you can read from a repartition topic, but I am not sure why this would be useful?

这篇关于阅读来自Kafka Streams的Peek主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆