阅读来自Kafka Streams的Peek主题 [英] Reading peek topic from kafka streams
问题描述
我有一个主题名称,它是push-processing-KSTREAM-PEEK-0000000014-repartition
,这是kafka的内部主题.我没有创建此主题,重新分区后我正在使用.peek()
方法,并且使用了3-4次peek方法.
I have a topic name which is push-processing-KSTREAM-PEEK-0000000014-repartition
and this is internal topic by kafka. I did not create this topic and I am using .peek()
method after repartition and using peek method 3-4 times.
我的问题是我可以从主题topic read push-processing-KSTREAM-PEEK-0000000014-repartition
中阅读,但是我说topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning
时看不懂.
My question is I can read from topic topic read push-processing-KSTREAM-PEEK-0000000014-repartition
, but I can not read when I say topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning
.
创建此内部主题是因为使用peek
方法吧?
This internal topic is created because of peek
method right?
它是否与其他分区流代码相关,但名称为KSTREEAM-PEEK
?
Or is it related with other repartition streams code, but its name is KSTREEAM-PEEK
?
它有50个分区.由于peek
是无状态操作,因此它不应该创建内部主题,但是为什么它的名称与peek
相关,为什么我不能从头开始阅读?
It has 50 partitions. Because of peek
is stateless operation, it should not create internal topics right but why is it name is related with peek
and why I can not read from beginning?
任何想法请/
这是第一个拓扑:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [appconnect_deviceIds_exported_for_push])
--> KSTREAM-FLATMAP-0000000004
Processor: KSTREAM-FLATMAP-0000000004 (stores: [])
--> KSTREAM-PEEK-0000000005
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-PEEK-0000000005 (stores: [])
--> KSTREAM-FILTER-0000000007
<-- KSTREAM-FLATMAP-0000000004
Processor: KSTREAM-FILTER-0000000007 (stores: [])
--> KSTREAM-SINK-0000000006
<-- KSTREAM-PEEK-0000000005
Sink: KSTREAM-SINK-0000000006 (topic: KSTREAM-PEEK-0000000005-repartition)
<-- KSTREAM-FILTER-0000000007
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000008 (topics: [KSTREAM-PEEK-0000000005-repartition])
--> KSTREAM-JOIN-0000000009
Source: KSTREAM-SOURCE-0000000028 (topics: [KSTREAM-PEEK-0000000025-repartition])
--> KSTREAM-JOIN-0000000029
Processor: KSTREAM-JOIN-0000000009 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
--> KSTREAM-MAP-0000000010
<-- KSTREAM-SOURCE-0000000008
Processor: KSTREAM-JOIN-0000000029 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
--> KSTREAM-PEEK-0000000030
<-- KSTREAM-SOURCE-0000000028
Processor: KSTREAM-MAP-0000000010 (stores: [])
--> KSTREAM-PEEK-0000000011
<-- KSTREAM-JOIN-0000000009
Processor: KSTREAM-PEEK-0000000030 (stores: [])
--> KSTREAM-MAP-0000000031
<-- KSTREAM-JOIN-0000000029
Processor: KSTREAM-MAP-0000000031 (stores: [])
--> KSTREAM-SINK-0000000032
<-- KSTREAM-PEEK-0000000030
Processor: KSTREAM-PEEK-0000000011 (stores: [])
--> KSTREAM-SINK-0000000012
<-- KSTREAM-MAP-0000000010
Source: KSTREAM-SOURCE-0000000002 (topics: [appconnect_device_stream])
--> KTABLE-SOURCE-0000000003
Sink: KSTREAM-SINK-0000000012 (topic: appconnect_devices_exported_for_push)
<-- KSTREAM-PEEK-0000000011
Sink: KSTREAM-SINK-0000000032 (topic: appconnect_devices_exported_for_push)
<-- KSTREAM-MAP-0000000031
Processor: KTABLE-SOURCE-0000000003 (stores: [appconnect_device_stream-STATE-STORE-0000000001])
--> none
<-- KSTREAM-SOURCE-0000000002
Sub-topology: 2
Source: KSTREAM-SOURCE-0000000013 (topics: [appconnect_userIds_exported_for_push])
--> KSTREAM-FLATMAP-0000000017
Processor: KSTREAM-FLATMAP-0000000017 (stores: [])
--> KSTREAM-PEEK-0000000018
<-- KSTREAM-SOURCE-0000000013
Processor: KSTREAM-PEEK-0000000018 (stores: [])
--> KSTREAM-FILTER-0000000020
<-- KSTREAM-FLATMAP-0000000017
Processor: KSTREAM-FILTER-0000000020 (stores: [])
--> KSTREAM-SINK-0000000019
<-- KSTREAM-PEEK-0000000018
Sink: KSTREAM-SINK-0000000019 (topic: KSTREAM-PEEK-0000000018-repartition)
<-- KSTREAM-FILTER-0000000020
Sub-topology: 3
Source: KSTREAM-SOURCE-0000000021 (topics: [KSTREAM-PEEK-0000000018-repartition])
--> KSTREAM-JOIN-0000000022
Processor: KSTREAM-JOIN-0000000022 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
--> KSTREAM-PEEK-0000000023
<-- KSTREAM-SOURCE-0000000021
Processor: KSTREAM-PEEK-0000000023 (stores: [])
--> KSTREAM-MAP-0000000024
<-- KSTREAM-JOIN-0000000022
Processor: KSTREAM-MAP-0000000024 (stores: [])
--> KSTREAM-PEEK-0000000025
<-- KSTREAM-PEEK-0000000023
Processor: KSTREAM-PEEK-0000000025 (stores: [])
--> KSTREAM-FILTER-0000000027
<-- KSTREAM-MAP-0000000024
Processor: KSTREAM-FILTER-0000000027 (stores: [])
--> KSTREAM-SINK-0000000026
<-- KSTREAM-PEEK-0000000025
Source: KSTREAM-SOURCE-0000000015 (topics: [appconnect_user_stream])
--> KTABLE-SOURCE-0000000016
Sink: KSTREAM-SINK-0000000026 (topic: KSTREAM-PEEK-0000000025-repartition)
<-- KSTREAM-FILTER-0000000027
Processor: KTABLE-SOURCE-0000000016 (stores: [appconnect_user_stream-STATE-STORE-0000000014])
--> none
<-- KSTREAM-SOURCE-0000000015
那是第二步,
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-PEEK-0000000014-repartition])
--> KSTREAM-JOIN-0000000018
Processor: KSTREAM-JOIN-0000000018 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
--> KSTREAM-FILTER-0000000019
<-- KSTREAM-SOURCE-0000000017
Processor: KSTREAM-FILTER-0000000019 (stores: [])
--> KSTREAM-SINK-0000000020
<-- KSTREAM-JOIN-0000000018
Source: KSTREAM-SOURCE-0000000001 (topics: [appconnect_push_processing_submissions])
--> KTABLE-SOURCE-0000000002
Sink: KSTREAM-SINK-0000000020 (topic: appconnect_push_send_bulk)
<-- KSTREAM-FILTER-0000000019
Processor: KTABLE-SOURCE-0000000002 (stores: [appconnect_push_processing_submissions-STATE-STORE-0000000000])
--> none
<-- KSTREAM-SOURCE-0000000001
Sub-topology: 1
Source: KSTREAM-SOURCE-0000000003 (topics: [appconnect_devices_exported_for_push])
--> KSTREAM-MAP-0000000007
Processor: KSTREAM-MAP-0000000007 (stores: [])
--> KSTREAM-PEEK-0000000008
<-- KSTREAM-SOURCE-0000000003
Processor: KSTREAM-PEEK-0000000008 (stores: [])
--> KSTREAM-FILTER-0000000010
<-- KSTREAM-MAP-0000000007
Processor: KSTREAM-FILTER-0000000010 (stores: [])
--> KSTREAM-SINK-0000000009
<-- KSTREAM-PEEK-0000000008
Sink: KSTREAM-SINK-0000000009 (topic: KSTREAM-PEEK-0000000008-repartition)
<-- KSTREAM-FILTER-0000000010
Sub-topology: 2
Source: KSTREAM-SOURCE-0000000011 (topics: [KSTREAM-PEEK-0000000008-repartition])
--> KSTREAM-LEFTJOIN-0000000012
Processor: KSTREAM-LEFTJOIN-0000000012 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
--> KSTREAM-KEY-SELECT-0000000013
<-- KSTREAM-SOURCE-0000000011
Processor: KSTREAM-KEY-SELECT-0000000013 (stores: [])
--> KSTREAM-PEEK-0000000014
<-- KSTREAM-LEFTJOIN-0000000012
Processor: KSTREAM-PEEK-0000000014 (stores: [])
--> KSTREAM-FILTER-0000000016
<-- KSTREAM-KEY-SELECT-0000000013
Processor: KSTREAM-FILTER-0000000016 (stores: [])
--> KSTREAM-SINK-0000000015
<-- KSTREAM-PEEK-0000000014
Source: KSTREAM-SOURCE-0000000005 (topics: [appconnect_user_stream])
--> KTABLE-SOURCE-0000000006
Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-PEEK-0000000014-repartition)
<-- KSTREAM-FILTER-0000000016
Processor: KTABLE-SOURCE-0000000006 (stores: [appconnect_user_stream-STATE-STORE-0000000004])
--> none
<-- KSTREAM-SOURCE-0000000005
我所有这些操作都使用相同的KEY.对于所有主题,我都有5个经纪人和50个分区.我有10个并发性,我将应用程序缩放到5.但是就像我说的那样,我正在进行分区并在同一密钥上传输3-4次数据.这意味着我所有与flatMap,地图操作相关的值都移到同一分区.我使用不同的密钥1到2次,因此消息分配到不同的分区,只有1-2次.这会影响我的表现吗?或者,我绝对应该分配到不同的分区上以提高性能.
And all of my these operations use same KEY. I have 5 brokers and 50 partitions for all topics. I have 10 concurrency and I scaled my app to 5. But like I said I am doing repartition and transfer data 3-4 times on a same key. That means all my values related flatMap, map operations go to same partition. 1 or 2 times I am using different key so messages distributed to different partitions, just 1-2 times. Does this affect my performance? Or I should definitely distribute on different partitions to increase my performance.
因此,kafka在仅使用主题之间的一个分区执行3-4次连接或重新分区操作时,基本上表现出更好的性能,因为kafka仅会读取一个分区,并且实际上知道在何处读取和立即读取所有内容.数据,因为数据在磁盘上物理上平行(我指的是ssd或hdd).或者是我的第二种情况;我肯定应该使用更多的分区来并行读取分区之间的内容吗?
So basically is kafka showing better performance when performing join or repartition operation with 3-4 times using only a partition between the topics, because kafka will read from only and only a partition and actually knows where to read and read immediately all the data because the data on the physically parallel on the disk (I mean ssd or hdd). Or my second scenario; I should definitely use more partitions to read parallel between the partitions?
我还认为使用peek会减慢我的速度.
And I also think that using peek slows my process.
推荐答案
peek()
操作无关.查看您发布的程序的拓扑描述(部分)如下:
The peek()
operation is unrelated. Looking at the topology description you posted you program (partly) is as follows:
KStream inputUser = builder.stream().flatMap().peek().filter();
KStream inputDevice = builder.stream().flatMap().peek().filter();
inputUser.join(inputDevice,...)
(如果您也将代码发布到问题中,也会更容易).
(Would be easier if you would post your code in the question, too).
因为您调用flatMap()
,Kafka Streams假定您更改了密钥,因此,调用join()
会触发数据重新分区.分区主题名称是由上游操作员生成的(我不100%知道为什么选择PEEK
而不是选择FILTER
.)
Because you call flatMap()
Kafka Streams assumes that you change the key, and hence, calling join()
triggers the data repartitioning. The repartition topic name is generated by upstream operatore (I am not 100% sure why PEEK
is picked instead of FILTER
to be fair.)
所有这些操作都使用相同的KEY.
And all of my these operations use same KEY.
在这种情况下,您可能要使用flatMapValues()
而不是flatMap()
.对于这种情况,Kafka Streams知道密钥没有更改,因此不会创建重新分区主题.
For this case, you might want to use flatMapValues()
instead of flatMap()
. For this case, Kafka Streams knows that the key did not change and thus it would not create a repartition topic.
类似地,如果密钥没有更改以避免不必要的重新分区,则可能要使用mapValues()
而不是map()
.
Similarly, you might want to use mapValues()
instead of map()
if the key does not change to avoid unnecessary repartitioning.
我的问题是我可以从主题主题读取push-processing-KSTREAM-PEEK-0000000014分区"中读取内容,但是当我说主题读取push-processing-KSTREAM-PEEK-0000000014分区-从头开始"
My question is I can read from topic "topic read push-processing-KSTREAM-PEEK-0000000014-repartition" but I can not read when I say "topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning"
我不确定您的意思.什么
I am not sure what you mean by this. What does
当我说主题读取push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning"时
when I say "topic read push-processing-KSTREAM-PEEK-0000000014-repartition --from-beginning"
是什么意思?您是否参考命令行工具bin/kafka-consumer.sh
?通常,是的,您可以阅读分区主题,但是我不确定为什么这会有用吗?
mean? Do you refer to the command line tool bin/kafka-consumer.sh
? In general, yes, you can read from a repartition topic, but I am not sure why this would be useful?
这篇关于阅读来自Kafka Streams的Peek主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!