Kafka Streams表转换 [英] Kafka Streams table transformations
问题描述
我在SQL Server中有一张表想要流化为Kafka主题,其结构如下:
I've got a table in SQL Server that I'd like to stream to Kafka topic, the structure is as follows:
(UserID, ReportID)
此表将不断更改(添加,插入,无更新的记录)
This table is going to be continuously changed (records added, inserted, no updates)
我想将其转换为这种结构并放入Elasticsearch:
I'd like to transform this into this kind of structure and put into Elasticsearch:
{
"UserID": 1,
"Reports": [1, 2, 3, 4, 5, 6]
}
到目前为止,我所看到的示例是日志或点击流,它们在我的情况下不起作用.
Examples I've seen so far are logs or click-stream which and do not work in my case.
这种用例有可能吗?我总是可以只看一下UserID
更改和查询数据库,但这似乎很幼稚,而不是最佳方法.
Is this kind of use case possible at all? I could always just look at UserID
changes and query database, but that seems naive and not the best approach.
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.util.ArrayList;
import java.util.Properties;
public class MyDemo {
public static void main(String... args) {
System.out.println("Hello KTable!");
final Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
KStream<Long, Long> reportPermission = builder.stream(TOPIC);
KTable<Long, ArrayList<Long>> result = reportPermission
.groupByKey()
.aggregate(
new Initializer<ArrayList<Long>>() {
@Override
public ArrayList<Long> apply() {
return null;
}
},
new Aggregator<Long, Long, ArrayList<Long>>() {
@Override
public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
aggregate.add(value);
return aggregate;
}
},
new Serde<ArrayList<Long>>() {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {}
@Override
public void close() {}
@Override
public Serializer<ArrayList<Long>> serializer() {
return null;
}
@Override
public Deserializer<ArrayList<Long>> deserializer() {
return null;
}
});
result.to("report-aggregated-topic");
KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static final String TOPIC = "report-permission";
private static final Properties createStreamProperties() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
return props;
}
}
我实际上陷入了聚合阶段,因为我无法为ArrayList<Long>
编写适当的SerDe(技能还不够),lambda似乎无法在聚合器上工作-它不知道agg
的类型是什么:
I'm actually getting stuck at aggregate stage because I cannot write a proper SerDe for ArrayList<Long>
(not enough skills yet), lambdas seem not to work on aggregator - it doesn't know what's the type of agg
:
KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
.groupByKey()
.aggregate(
() -> new ArrayList<Long>(),
(key, val, agg) -> agg.add(val),
longSerde
);
推荐答案
您可以使用Kafka的Connect API将数据从SQL Server获取到Kafka.我不知道SQL Server的任何特定连接器,但是您可以使用任何基于JDBC的通用连接器: https://www.confluent.io/product/connectors/
You can use Kafka's Connect API to get the data from SQL Server into Kafka. I am not aware of any specific connector for SQL Server but you can use any generic JDBC based connector: https://www.confluent.io/product/connectors/
要处理数据,可以使用Kafka的Streams API.您可以简单地aggregate()
每个用户的所有报告.像这样:
To process data you can use Kafka's Streams API. You can simply aggregate()
all report per user. Something like this:
KTable<UserId, List<Reports>> result =
builder.stream("topic-name")
.groupByKey()
// init a new empty list and
// `add` the items to the list in the actual aggregation
.aggregate(...);
result.to("result-topic");
查看文档以获取有关Streams API的更多详细信息: https://docs. confluent.io/current/streams/index.html
Check out the docs for more details about Streams API: https://docs.confluent.io/current/streams/index.html
请注意,您需要确保报告列表不会无限增长. Kafka具有一些(可配置)最大消息大小,并且整个列表将包含在单个消息中.因此,您应该估计最大消息大小并在投入生产之前应用相应的配置(->
max.message.bytes
).在网页上查看配置: http://kafka.apache.org/documentation/#brokerconfigs
Note, that you need to make sure that the list of reports does not grow unbounded. Kafka has some (configurable) maximum message size and the whole list will be contained in a single message. Thus, you should estimate the maximum message size and apply the corresponding configuration (->
max.message.bytes
) before going into production. Check out configs at the webpage: http://kafka.apache.org/documentation/#brokerconfigs
最后,您使用Connect API将数据推送到Elastic Search中.有多种不同的连接器可用(我当然会推荐Confluent一种连接器).有关Connect API的更多详细信息: https://docs.confluent.io/current/connect/userguide.html
Finally, you use Connect API to push the data into Elastic Search. There are multiple different connector available (I would of course recommend the Confluent one). More details about Connect API: https://docs.confluent.io/current/connect/userguide.html
这篇关于Kafka Streams表转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!