KStream-KTable联接写入KTable:如何将联接与ktable写入同步? [英] KStream-KTable join writing to the KTable: How to sync the join with the ktable write?

查看:64
本文介绍了KStream-KTable联接写入KTable:如何将联接与ktable写入同步?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对以下拓扑的行为有一些疑问:

I'm having some issue with how the following topology behaves:

String topic = config.topic();

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    .peek((k, v) -> L.info("Event:"+v.action))
    // join the event with the according entry in the KTable and apply the state mutation
    .leftJoin(myTable, eventHandler::handleEvent, UUIDSerdes.get(), EventSerdes.get())
    .peek((k, v) -> L.info("Updated:" + v.id + "-" + v.id2))
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

当我同时收到不同的事件时,就会发生我的问题.由于我的状态突变是由 leftJoin 完成,然后由 to 方法编写的.如果使用相同的密钥同时接收到事件1和2,则会发生以下情况:

My Issue happens when i receive different events at the same time. As my state mutation is done by the leftJoin and then written by the to method. I can have the following occuring if event 1 and 2 are received at the same time with the same key:

event1 joins with state A => state A mutated to state X
event2 joins with state A => state A mutated to state Y
state X written to the KTable topic
state Y written to the KTable topic

因此,状态Y没有来自 event1 的更改,因此我丢失了数据.

Because of that, state Y doesn't have the changes from event1, so I lost data.

这里是我所看到的日志( Processing:... 部分是从值连接器内部记录的):

Here's in terms of logs what I see (the Processing:... part is logged from inside the value joiner):

Event:Event1
Event:Event2
Processing:Event1, State:none
Updated:1-null
Processing:Event2, State:none
java.lang.IllegalStateException: Event2 event received but we don't have data for id 1

Event1 可以被视为创建事件:它将在KTable中创建条目,因此状态为空并不重要. Event2 虽然需要将其更改应用到现有状态,但找不到任何内容,因为第一个状态突变仍未写入KTable( to 方法)

Event1 can be considered as the creation event: it will create the entry in the KTable so it doesn't matter if the state is empty. Event2 though needs to apply it's changes to an existing state but it doesn't find any because the first state mutation still hasn't been written to the KTable (it's still hasn't been processed by the to method)

是否有办法确保我的leftJoin和我对ktable的写操作是原子完成的?

谢谢

更新和当前解决方案

由于@Matthias的响应,我得以使用 Transformer 找到解决方案.

Thanks to the response of @Matthias I was able to find a solution using a Transformer.

代码如下:

那是变压器

public class KStreamStateLeftJoin<K, V1, V2> implements Transformer<K, V1, KeyValue<K, V2>> {

    private final String                    stateName;
    private final ValueJoiner<V1, V2, V2>   joiner;
    private final boolean                   updateState;

    private KeyValueStore<K, V2>            state;

    public KStreamStateLeftJoin(String stateName, ValueJoiner<V1, V2, V2> joiner, boolean updateState) {
        this.stateName = stateName;
        this.joiner = joiner;
        this.updateState = updateState;
    }

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        this.state = (KeyValueStore<K, V2>) context.getStateStore(stateName);
    }

    @Override
    public KeyValue<K, V2> transform(K key, V1 value) {
        V2 stateValue = this.state.get(key); // Get current state
        V2 updatedValue = joiner.apply(value, stateValue); // Apply join
        if (updateState) {
            this.state.put(key, updatedValue); // write new state
        }
        return new KeyValue<>(key, updatedValue);
    }

    @Override
    public KeyValue<K, V2> punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {}
}

这是经过调整的拓扑:

String topic = config.topic();
String store = topic + "-store";

KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic, store);

// Receive a stream of various events
topology.eventsStream()
    // Only process events that are implementing MyEvent
    .filter((k, v) -> v instanceof MyEvent)
    // Cast to ease the code
    .mapValues(v -> (MyEvent) v)
    // rekey by data id
    .selectKey((k, v) -> v.data.id)
    // join the event with the according entry in the KTable and apply the state mutation
    .transform(() -> new KStreamStateLeftJoin<UUID, MyEvent, MyData>(store, eventHandler::handleEvent, true), store)
    // write the updated state to the KTable.
    .to(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);

当我们使用KTable的KV StateStore并通过 put 方法事件直接在其中应用更改时,shoudl始终会选择更新后的状态.我仍然想知道的一件事:如果我连续获得高吞吐量的事件该怎么办?

As we're using the KTable's KV StateStore and applying changes directly in it through the put method events shoudl always pick up the updated state. One thing i'm still wondering: what if I have a continuous high throughput of events.

我们在KTable的KV存储上执行的看跌期权与在KTable的主题中完成的写入之间是否仍存在竞争条件?

推荐答案

一个 KTable 分片到多个物理存储中,每个存储仅由单个线程更新.因此,您描述的情况不会发生.如果您有2条具有相同时间戳的记录,并且它们都更新相同的分片,则它们将一个接一个地处理(以偏移顺序).因此,第二次更新将看到第一次更新后的状态.

A KTable is sharded into multiple physical stores and each store is only updated by a single thread. Thus, the scenario you describe cannot happen. If you have 2 records with the same timestamp that both update the same shard, they will be processed one after each other (in offset order). Thus, the second update will see the state of after the first update.

所以也许您只是描述了不正确的情况?

So maybe you just did describe your scenario not correctly?

更新

进行连接时,您无法更改状态.因此,期望

You cannot mutate the state when doing a join. Thus, the expectation that

event1 joins with state A => state A mutated to state X

是错误的.与任何处理顺序无关,当 event1 state A 连接时,它将以只读模式访问 state A ,并以 state A 不会被修改.

is wrong. Independent of any processing order, when event1 joins with state A, it will access state A in read only mode and state A will not be modified.

因此,当 event2 加入时,它将看到与 event1 相同的状态.对于流表联接,仅当从table-input-topic中读取新数据时,才更新表状态.

Thus, when event2 joins, it will see the same state as event1. For stream-table join, the table state is only updated when new data is read from the table-input-topic.

如果您希望共享状态可以从两个输入中更新,则需要使用 transform()构建自定义解决方案:

If you want to have a shared state that is updated from both inputs, you would need to build a custom solution using transform():

builder.addStore(..., "store-name");
builder.stream("table-topic").transform(..., "store-name"); // will not emit anything downstream
KStream result = builder.stream("stream-topic").transform(..., "store-name");

这将创建一个由两个处理器共享的存储,并且两个存储都可以根据需要进行读取/写入.因此,对于表输入,您可以只更新状态而无需向下游发送任何内容,而对于流输入,您可以进行联接,更新状态并向下游发送结果.

This will create one store that is shared by both processors and both can read/write as they wish. Thus, for the table-input you can just update the state without sending anything downstream, while for the stream-input you can do the join, update the state, and send a result downstream.

更新2

关于解决方案, Transformer 应用于状态的更新之间不会存在竞争条件,并且在状态更新后记录 Transformer 进程.这部分将在单个线程中执行,并且记录将按与输入主题偏移的顺序进行处理.因此,可以确保状态更新可用于以后的记录.

With regard to the solution, there will be no race condition between the updates the Transformer applies to the state and records the Transformer processes after the state update. This part will be executed in a single thread, and records will be processed in offset-order from the input topic. Thus, it's ensured that a state update will be available to later records.

这篇关于KStream-KTable联接写入KTable:如何将联接与ktable写入同步?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆