Kafka Streams 表转换 [英] Kafka Streams table transformations

查看:26
本文介绍了Kafka Streams 表转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 SQL Server 中有一个表,我想将其流式传输到 Kafka 主题,结构如下:

I've got a table in SQL Server that I'd like to stream to Kafka topic, the structure is as follows:

(UserID, ReportID)

此表将不断更改(添加、插入、不更新记录)

This table is going to be continuously changed (records added, inserted, no updates)

我想把它转换成这种结构并放到 Elasticsearch 中:

I'd like to transform this into this kind of structure and put into Elasticsearch:

{
  "UserID": 1,
  "Reports": [1, 2, 3, 4, 5, 6]
}

到目前为止我看到的例子是日志或点击流,但在我的情况下不起作用.

Examples I've seen so far are logs or click-stream which and do not work in my case.

这种用例可能吗?我总是可以只查看 UserID 更改和查询数据库,但这似乎很幼稚,也不是最好的方法.

Is this kind of use case possible at all? I could always just look at UserID changes and query database, but that seems naive and not the best approach.

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;

import java.util.ArrayList;
import java.util.Properties;

public class MyDemo {
  public static void main(String... args) {
    System.out.println("Hello KTable!");

    final Serde<Long> longSerde = Serdes.Long();

    KStreamBuilder builder = new KStreamBuilder();

    KStream<Long, Long> reportPermission = builder.stream(TOPIC);

    KTable<Long, ArrayList<Long>> result = reportPermission
        .groupByKey()
        .aggregate(
            new Initializer<ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply() {
                return null;
              }
            },
            new Aggregator<Long, Long, ArrayList<Long>>() {
              @Override
              public ArrayList<Long> apply(Long key, Long value, ArrayList<Long> aggregate) {
                aggregate.add(value);
                return aggregate;
              }
            },
            new Serde<ArrayList<Long>>() {
              @Override
              public void configure(Map<String, ?> configs, boolean isKey) {}

              @Override
              public void close() {}

              @Override
              public Serializer<ArrayList<Long>> serializer() {
                return null;
              }

              @Override
              public Deserializer<ArrayList<Long>> deserializer() {
                return null;
              }
            });

    result.to("report-aggregated-topic");

    KafkaStreams streams = new KafkaStreams(builder, createStreamProperties());
    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

  private static final String TOPIC = "report-permission";

  private static final Properties createStreamProperties() {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "report-permission-app");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");

    return props;
  }
}

我实际上陷入了聚合阶段,因为我无法为 ArrayList 编写合适的 SerDe(还没有足够的技能),lambdas 似乎不适用于聚合器 - 它没有知道 agg 的类型是什么:

I'm actually getting stuck at aggregate stage because I cannot write a proper SerDe for ArrayList<Long> (not enough skills yet), lambdas seem not to work on aggregator - it doesn't know what's the type of agg:

KTable<Long, ArrayList<Long>> sample = builder.stream(TOPIC)
    .groupByKey()
    .aggregate(
        () -> new ArrayList<Long>(),
        (key, val, agg) -> agg.add(val),
        longSerde
    );

推荐答案

您可以使用 Kafka 的 Connect API 将数据从 SQL Server 获取到 Kafka.我不知道 SQL Server 有任何特定的连接器,但您可以使用任何基于 JDBC 的通用连接器:https://www.confluent.io/product/connectors/

You can use Kafka's Connect API to get the data from SQL Server into Kafka. I am not aware of any specific connector for SQL Server but you can use any generic JDBC based connector: https://www.confluent.io/product/connectors/

要处理数据,您可以使用 Kafka 的 Streams API.您可以简单地aggregate() 每个用户的所有报告.像这样:

To process data you can use Kafka's Streams API. You can simply aggregate() all report per user. Something like this:

KTable<UserId, List<Reports>> result =
    builder.stream("topic-name")
           .groupByKey()
           // init a new empty list and
           // `add` the items to the list in the actual aggregation
           .aggregate(...);

result.to("result-topic");

查看文档以了解有关 Streams API 的更多详细信息:https://docs.confluent.io/current/streams/index.html

Check out the docs for more details about Streams API: https://docs.confluent.io/current/streams/index.html

请注意,您需要确保报告列表不会无限增长.Kafka 有一些(可配置的)最大消息大小,整个列表将包含在单个消息中.因此,您应该在投入生产之前估计最大消息大小并应用相应的配置 (-> max.message.bytes).在网页上查看配置:http://kafka.apache.org/documentation/#brokerconfigs

Note, that you need to make sure that the list of reports does not grow unbounded. Kafka has some (configurable) maximum message size and the whole list will be contained in a single message. Thus, you should estimate the maximum message size and apply the corresponding configuration (-> max.message.bytes) before going into production. Check out configs at the webpage: http://kafka.apache.org/documentation/#brokerconfigs

最后,您使用 Connect API 将数据推送到 Elastic Search.有多种不同的连接器可用(我当然会推荐 Confluent 之一).有关 Connect API 的更多详细信息:https://docs.confluent.io/current/connect/用户指南.html

Finally, you use Connect API to push the data into Elastic Search. There are multiple different connector available (I would of course recommend the Confluent one). More details about Connect API: https://docs.confluent.io/current/connect/userguide.html

这篇关于Kafka Streams 表转换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆