如何将记录拆分为不同的流,从一个主题到不同的流? [英] How to split records into different streams, from one topic to different streams?

查看:24
本文介绍了如何将记录拆分为不同的流,从一个主题到不同的流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含不同大小记录的单一源 CSV 文件,这些记录将每条记录推送到一个源主题中.我想从该源主题将记录拆分为不同的 KStreams/KTables.我有一个用于一个表加载的管道,我将记录从源主题以分隔格式推送到 stream1,然后将记录推送到另一个 AVRO 格式的流,然后推送到 JDBC 接收器连接器,将记录推送到 MySQL 数据库.管道需要相同.但是我想将不同表的记录推送到一个源主题中,然后根据一个值将记录拆分到不同的流中.这可能吗?我尝试寻找方法来做到这一点,但不能.我也可以以某种方式改进管道或使用 KTable 而不是 KStreams 或任何其他修改吗?

I have a single source CSV file containing records of different sizes that pushes every record into one source topic. I want to split the records into different KStreams/KTables from that source topic. I have a pipeline for one table load, where I am pushing the record from the source topic into stream1 in delimited format and then pushing the records into another stream in AVRO format which is then pushed into JDBC sink connector that pushes the record into MySQL database. The pipeline needs to be the same. But I wanted to push records of different tables into one source topic and then split the records into the different streams as per one value. Is this possible? I tried searching for ways to do that but could not. Can I improve the pipeline somehow too or use KTable instead of KStreams or any other modifications?

我目前的流量 -一个源 CSV 文件 (source.csv) -> 源主题(名称 - 包含 test1 记录的源主题)-> 流 1(分隔值格式)-> 流 2(作为 AVRO 值格式)-> 结束topic (name - sink-db-test1) -> JDBC sink connector -> MySQL DB (name - test1)

My current flow - one source CSV file (source.csv) -> source topic (name - sourcetopic containing test1 records) -> stream 1 (delimited value format) -> stream 2 (as AVRO value format) -> end topic (name - sink-db-test1) -> JDBC sink connector -> MySQL DB (name - test1)

我有一个不同的 MySQL 表 test2 具有不同的架构,该表的记录也存在于 source.csv 文件中.由于架构不同,我无法按照 test1 的当前管道将数据插入到 test2 表中.

I have a different MySQL table test2 with a different schema and the records for this table are also present in source.csv file. Since the schema is different I cannot follow the current pipeline of test1 to insert data into the test2 table.

示例 -在 CSV 源文件中,

Example - in CSV source file,

line 1 - 9,atm,mun,ronaldo行 2- 10,atm,mun,bravo,num2line 3 - 11,atm,sign,bravo,sick

在此示例中,要拆分的值是第 4 列(ronaldobravo)所有这些数据应分别加载到table 1table 2table 3关键是第 4 列.

here in this example, the value under which it is to be split is column 4 (ronaldo or bravo) all these data should be loaded into table 1, table 2, table 3 respectively The key is the column 4.

如果col4==ronaldo,转到表1如果 col4==bravo 和 col3==mun,转到表 2如果 col4==bravo 和 col3 ==sign 转到表 3

我对 Kafka 很陌生,从上周开始进行 Kafka 开发.

I am very new to Kafka, started Kafka development from the previous week.

推荐答案

您可以编写一个单独的 Kafka Streams 应用程序,使用 KStream#branch() 运算符:

You can write a separated Kafka Streams application to split records from the input topic to different KStream or output topics using KStream#branch() operator:

KStream<K, V>[] branches = streamsBuilder.branch(
        (key, value) -> {filter logic for topic 1 here},
        (key, value) -> {filter logic for topic 2 here},
        (key, value) -> true//get all messages for this branch
);

// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3

或者你可以像这样手动分支你的 KStream:

Or you could manually branch your KStream like this:

KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));

inputKStream
        .filter((key, value) -> {filter logic for topic 1 here})
        .to("your_1st_output_topic");

inputKStream
        .filter((key, value) -> {filter logic for topic 2 here})
        .to("your_2nd_output_topic");
...

这篇关于如何将记录拆分为不同的流,从一个主题到不同的流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆