KStream-KTable join 写入 KTable:如何将 join 与 ktable 写入同步? [英] KStream-KTable join writing to the KTable: How to sync the join with the ktable write?

查看:35
本文介绍了KStream-KTable join 写入 KTable:如何将 join 与 ktable 写入同步?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对以下拓扑的行为有一些疑问:

I'm having some issue with how the following topology behaves:

String topic = config.topic();

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    .peek((k, v) -> L.info("Event:"+v.action))
    // join the event with the according entry in the KTable and apply the state mutation
    .leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
    .peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

当我同时收到不同的事件时,就会发生我的问题.因为我的状态变化是由 leftJoin 完成的,然后由 to 方法编写.如果使用相同的键同时接收到事件 1 和 2,我可能会发生以下情况:

My Issue happens when i receive different events at the same time. As my state mutation is done by the leftJoin and then written by the to method. I can have the following occuring if event 1 and 2 are received at the same time with the same key:

event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic

因此,状态 Y 没有来自 event1 的更改,所以我丢失了数据.

Because of that, state Y doesn't have the changes from event1, so I lost data.

这是我所看到的日志(Processing:... 部分是从值连接器内部记录的):

Here's in terms of logs what I see (the Processing:... part is logged from inside the value joiner):

Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1

Event1 可以认为是创建事件:它将在 KTable 中创建条目,因此状态是否为空无关紧要.Event2 虽然需要将其更改应用于现有状态,但它没有找到任何更改,因为第一个状态更改仍未写入 KTable(它仍未被 to 方法)

Event1 can be considered as the creation event: it will create the entry in the KTable so it doesn't matter if the state is empty. Event2 though needs to apply it's changes to an existing state but it doesn't find any because the first state mutation still hasn't been written to the KTable (it's still hasn't been processed by the to method)

有没有办法确保我的 leftJoin 和我对 ktable 的写入是原子地完成的?

谢谢

更新&当前解决方案

感谢@Matthias 的回应,我能够使用 Transformer 找到解决方案.

Thanks to the response of @Matthias I was able to find a solution using a Transformer.

代码如下:

那是变压器

public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {

    private final String                    stateName;
    private final ValueJoiner<V1, V2, V2>   joiner;
    private final boolean                   updateState;

    private KeyValueStore<K, V2>            state;

    public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
        this.stateName = stateName;
        this.joiner = joiner;
        this.updateState = updateState;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
    }

    @Override
    public KeyValue<K, V2> transform(K key, V1 value) {
        V2 stateValue = this.state.get(key); // Get current state
        V2 updatedValue = joiner.apply(value, stateValue); // Apply join
        if (updateState) {
            this.state.put(key, updatedValue); // write new state
        }
        return new KeyValue<>(key, updatedValue);
    }

    @Override
    public KeyValue<K, V2> punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {}
}

这是适应的拓扑:

String topic = config.topic();
String store = topic + "-store";

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    // join the event with the according entry in the KTable and apply the state mutation
    .transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

当我们使用 KTable 的 KV StateStore 并通过 put 方法直接在其中应用更改时,事件应该始终获取更新的状态.我仍然想知道的一件事是:如果我有持续的高事件吞吐量会怎样.

As we're using the KTable's KV StateStore and applying changes directly in it through the put method events shoudl always pick up the updated state. One thing i'm still wondering: what if I have a continuous high throughput of events.

我们对 KTable 的 KV 存储所做的 put 和在 KTable 的主题中完成的写入之间是否仍然存在竞争条件?

推荐答案

一个 KTable 分片到多个实体店,每个店只更新一个线程.因此,您描述的场景不会发生.如果您有 2 条具有相同时间戳的记录,它们都更新同一个分片,则它们将一个接一个地进行处理(按偏移顺序).这样,第二次更新就会看到第一次更新后的状态.

A KTable is sharded into multiple physical stores and each store is only updated by a single thread. Thus, the scenario you describe cannot happen. If you have 2 records with the same timestamp that both update the same shard, they will be processed one after each other (in offset order). Thus, the second update will see the state of after the first update.

所以也许你只是没有正确描述你的场景?

So maybe you just did describe your scenario not correctly?

更新

在进行连接时您不能改变状态.因此,期望

You cannot mutate the state when doing a join. Thus, the expectation that

event1 joins with state A => state A mutated to state X

错了.独立于任何处理顺序,当event1加入state A时,它将以只读模式和state A<访问state A/code> 不会被修改.

is wrong. Independent of any processing order, when event1 joins with state A, it will access state A in read only mode and state A will not be modified.

因此,当 event2 加入时,它将看到与 event1 相同的状态.对于流表连接,表状态仅在从表输入主题读取新数据时更新.

Thus, when event2 joins, it will see the same state as event1. For stream-table join, the table state is only updated when new data is read from the table-input-topic.

如果您想要从两个输入更新共享状态,则需要使用 transform() 构建自定义解决方案:

If you want to have a shared state that is updated from both inputs, you would need to build a custom solution using transform():

builder.addStore(..., "store-name");
builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
KStream result = builder.stream("stream-topic").transform(..., "store-name");

这将创建一个由两个处理器共享的存储,并且两者都可以根据需要进行读/写.因此,对于表输入,您可以只更新状态而不向下游发送任何内容,而对于流输入,您可以进行连接、更新状态并向下游发送结果.

This will create one store that is shared by both processors and both can read/write as they wish. Thus, for the table-input you can just update the state without sending anything downstream, while for the stream-input you can do the join, update the state, and send a result downstream.

更新 2

关于解决方案,Transformer 应用到状态的更新和记录状态更新后的Transformer 过程之间不会有竞争条件.这部分将在单个线程中执行,并且记录将按照输入主题的偏移顺序进行处理.因此,可以确保状态更新可用于以后的记录.

With regard to the solution, there will be no race condition between the updates the Transformer applies to the state and records the Transformer processes after the state update. This part will be executed in a single thread, and records will be processed in offset-order from the input topic. Thus, it's ensured that a state update will be available to later records.

这篇关于KStream-KTable join 写入 KTable:如何将 join 与 ktable 写入同步?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆