Kafka Streams - 根据 Streams 数据发送不同的主题 [英] Kafka Streams - Send on different topics depending on Streams Data

查看:23
本文介绍了Kafka Streams - 根据 Streams 数据发送不同的主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 kafka 流应用程序,正在等待发布关于主题 user_activity 的记录.它将接收 json 数据,并根据我想将该流推送到不同主题的键的值.

I have a kafka streams application waiting for records to be published on topic user_activity. It will receive json data and depending on the value of against a key I want to push that stream into different topics.

这是我的流应用代码:

KStream<String, String> source_user_activity = builder.stream("user_activity");
        source_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                System.out.println("value: " +  value);
                ArrayList<String> keywords = new ArrayList<String>();
                try {
                    JSONObject send = new JSONObject();
                    JSONObject received = new JSONObject(value);

                    send.put("current_date", getCurrentDate().toString());
                    send.put("activity_time", received.get("CreationTime"));
                    send.put("user_id", received.get("UserId"));
                    send.put("operation_type", received.get("Operation"));
                    send.put("app_name", received.get("Workload"));
                    keywords.add(send.toString());
                    // apply regex to value and for each match add it to keywords

                } catch (Exception e) {
                    // TODO: handle exception
                    System.err.println("Unable to convert to json");
                    e.printStackTrace();
                }

                return keywords;
            }
        }).to("user_activity_by_date");

在此代码中,我想检查操作类型,然后根据该类型将流推送到相关主题中.

In this code, I want to check operation type and then depending on that I want to push the streams into the relevant topic.

我怎样才能做到这一点?

How can I achieve this?

我已将代码更新为:

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
KStream<String, String>[] branches = source_o365_user_activity.branch( 
      (key, value) -> (value.contains("Operation\":\"SharingSet") && value.contains("ItemType\":\"File")),
      (key, value) -> (value.contains("Operation\":\"AddedToSecureLink") && value.contains("ItemType\":\"File")),
      (key, value) -> true
     );

branches[0].to("o365_sharing_set_by_date");
branches[1].to("o365_added_to_secure_link_by_date");
branches[2].to("o365_user_activity_by_date");

推荐答案

您可以使用 branch 方法来拆分您的流.此方法采用谓词将源流拆分为多个流.

You can use branch method in order to split your stream. This method takes predicates for splitting the source stream into several streams.

以下代码摘自kafka-streams-examples:

KStream<String, OrderValue>[] forks = ordersWithTotals.branch(
    (id, orderValue) -> orderValue.getValue() >= FRAUD_LIMIT,
    (id, orderValue) -> orderValue.getValue() < FRAUD_LIMIT);

forks[0].mapValues(
    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, FAIL))
    .to(ORDER_VALIDATIONS.name(), Produced
        .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));

forks[1].mapValues(
    orderValue -> new OrderValidation(orderValue.getOrder().getId(), FRAUD_CHECK, PASS))
    .to(ORDER_VALIDATIONS.name(), Produced
  .with(ORDER_VALIDATIONS.keySerde(), ORDER_VALIDATIONS.valueSerde()));

这篇关于Kafka Streams - 根据 Streams 数据发送不同的主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆