如何将记录分为不同的流,从一个主题到不同的流? [英] How to split records into different streams, from one topic to different streams?

查看:38
本文介绍了如何将记录分为不同的流,从一个主题到不同的流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个源CSV文件,其中包含不同大小的记录,可将每条记录推入一个源主题.我想将记录从该源主题拆分为不同的KStreams/KTables.我有一个用于一个表加载的管道,在该管道中,我将记录从源主题以定界格式推入stream1,然后将记录推入AVRO格式的另一个流,然后推入JDBC接收器连接器,该记录将记录推入MySQL数据库.管道必须相同.但是我想将不同表的记录推送到一个源主题中,然后按照一个值将记录拆分为不同的流.这可能吗?我试图寻找实现此目标的方法,但是没有成功.我可以以某种方式改善管道还是使用KTable而不是KStreams或任何其他修改吗?

I have a single source CSV file containing records of different sizes that pushes every record into one source topic. I want to split the records into different KStreams/KTables from that source topic. I have a pipeline for one table load, where I am pushing the record from the source topic into stream1 in delimited format and then pushing the records into another stream in AVRO format which is then pushed into JDBC sink connector that pushes the record into MySQL database. The pipeline needs to be the same. But I wanted to push records of different tables into one source topic and then split the records into the different streams as per one value. Is this possible? I tried searching for ways to do that but could not. Can I improve the pipeline somehow too or use KTable instead of KStreams or any other modifications?

我当前的流量-一个源CSV文件( source.csv )->源主题(名称-包含test1记录的源主题)->流1(定值格式)->流2(作为AVRO值格式)->结束主题(名称- sink-db-test1 )-> JDBC接收器连接器-> MySQL DB(名称- test1 )

My current flow - one source CSV file (source.csv) -> source topic (name - sourcetopic containing test1 records) -> stream 1 (delimited value format) -> stream 2 (as AVRO value format) -> end topic (name - sink-db-test1) -> JDBC sink connector -> MySQL DB (name - test1)

我有一个具有不同架构的不同MySQL表 test2 ,该表的记录也存在于 source.csv 文件中.由于架构不同,因此我无法遵循 test1 的当前管道将数据插入 test2 表中.

I have a different MySQL table test2 with a different schema and the records for this table are also present in source.csv file. Since the schema is different I cannot follow the current pipeline of test1 to insert data into the test2 table.

示例-在CSV源文件中,

Example - in CSV source file,

第1-9行,atm,mun,ronaldo第2-10行,atm,mun,bravo,num2第3-11行,atm,sign,bravo,sick

在此示例中,要拆分的值是第4列( ronaldo bravo )所有这些数据都应分别加载到表1 表2 表3 关键是第4列.

here in this example, the value under which it is to be split is column 4 (ronaldo or bravo) all these data should be loaded into table 1, table 2, table 3 respectively The key is the column 4.

如果col4 == ronaldo,请转到表1如果col4 == bravo和col3 == mun,请转到表2如果col4 == bravo和col3 == sign转到表3

我对Kafka并不陌生,从上周开始进行Kafka开发.

I am very new to Kafka, started Kafka development from the previous week.

推荐答案

您可以编写一个单独的Kafka Streams应用程序,以使用

You can write a separated Kafka Streams application to split records from the input topic to different KStream or output topics using KStream#branch() operator:

KStream<K, V>[] branches = streamsBuilder.branch(
        (key, value) -> {filter logic for topic 1 here},
        (key, value) -> {filter logic for topic 2 here},
        (key, value) -> true//get all messages for this branch
);

// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3

或者您可以像这样手动分支您的KStream:

Or you could manually branch your KStream like this:

KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));

inputKStream
        .filter((key, value) -> {filter logic for topic 1 here})
        .to("your_1st_output_topic");

inputKStream
        .filter((key, value) -> {filter logic for topic 2 here})
        .to("your_2nd_output_topic");
...

这篇关于如何将记录分为不同的流,从一个主题到不同的流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆