将数据从Kafka商店转移到Kafka主题 [英] Transfer data from Kafka store to a Kafka topic
本文介绍了将数据从Kafka商店转移到Kafka主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我想用卡夫卡做这样的事情:
- 继续在KStream/Ktable/Kafka-store中存储数据
- 当我的应用程序收到特定事件/数据时,仅将上述存储区中的特定数据集发送到主题。
我们可以用卡夫卡做这个吗?我不认为单独使用Kafka消费者会有帮助,因为当一组数据已被使用时,我们不能启动/暂停消费者。
推荐答案
创建流和加入流的配置如下:-
CREATE STREAM Customer Profile WITH (
KAFKA_TOPIC = 'Customer Profile',
VALUE_FORMAT = 'AVRO',
KEY_FORMAT = 'AVRO'
);
CREATE STREAM Customer Buy WITH (
KAFKA_TOPIC = 'Customer Buy',
VALUE_FORMAT = 'AVRO',
KEY_FORMAT = 'AVRO'
);
在这里,我使用从MySQL表创建的另一个主题创建一个Kafka Streams
流创建后,我们需要在Apply Joins和Filter之后创建另一个流,但流中的JOIN有一个条件,即我们只能应用与两个流的JOIN。 以下是加入示例:-CREATE STREAM account_summary_by_category_by_month_by_accunt_no(
account_no int,
date_time timestamp,
category text,
category_description text,
amount float,
customer_id int,
FROM Customer_Profile cp INNER JOIN Customer_Buy cb
WITHIN 7 DAYS
ON cp.id = cb.order_id;
您可以在此应用所有Kesql函数,如用于筛选数据的SQL
这篇关于将数据从Kafka商店转移到Kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文