Apache Beam:状态规范中的TTL [英] Apache beam: TTL in State Spec

查看:96
本文介绍了Apache Beam:状态规范中的TTL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在从Kinesis中读取内容并写入镶木地板,并使用 StateSpec< ValueState< Boolean>> 来避免在从上一个保存点正常停止并重新启动管道之后重复处理记录.

We are reading from Kinesis and writing to parquet and we use StateSpec<ValueState<Boolean>> to avoid duplicated processing of records after gracefully stopping and relaunching our pipeline from the last savepoint.

我们看到一些记录被重复,因为它们最终在随后的重新启动时落在了不同的任务管理器上,并且我们使用 StateSpec< ValueState< Boolean>> 来存储有关已处理记录的有状态信息并避免重复.

We saw that some records were duplicated because they end up falling on a different task manager on subsequent relaunches, and we use StateSpec<ValueState<Boolean>> to store stateful information about the processed records and avoid duplicates.

我们正在研究如何每隔一定时间清除状态,而又不会丢失最近处理的记录(如果在即将到来的站点中需要它们的话)的风险.(即,我们需要在该类上使用类似TTL的功能.

We are dealing with how to clear the state every certain time without the risk of losing the most recent processed records if they are needed in an upcoming stop. (i.e, we need a something like a TTL on that class).

我们想到了一个计时器,该计时器每隔一定时间清除一次状态,但由于我们需要保留最新处理的记录,因此不符合我们的要求.

We thought about a timer that clears the state every certain time but that doesn't meet our requirements because we need to keep the most recent processed records.

我们阅读了

We read here that using event time processing automatically clears State information after a window expires and we would like to know if that fits with our requirement using the StateSpec class.

否则,是否有一个存储状态的类具有实现此功能的一种TTL?

Otherwise, is there a class to store state that has a kind of TTL to implement this feature?

我们现在所拥有的这段代码检查该元素是否已经处理过,以及一种每隔一定时间清除状态的方法

What we have right now is this piece of code that checks if the element has already processed and a method that clears the state every certain time

    @StateId("keyPreserved")
    private final StateSpec<ValueState<Boolean>> keyPreserved = StateSpecs.value(BooleanCoder.of());
    @TimerId("resetStateTimer")
    private final TimerSpec resetStateTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

    public void processElement(ProcessContext context,
        @TimerId("resetStateTimer") Timer resetStateTimer,
        @StateId("keyPreserved") ValueState<Boolean> keyPreservedState) {
        if (!firstNonNull(keyPreservedState.read(), false)) {
            T message = context.element().getValue();

            //Process element here

            keyPreservedState.write(true);
        }
    }


    @OnTimer("resetStateTimer")
    public void onResetStateTimer(OnTimerContext context,
        @StateId("keyPreserved") ValueState<Boolean> keyPreservedState) {
        keyPreservedState.clear();
    }

推荐答案

设置每次我们调用 keyPreservedState.write(true); 的计时器就足够了.当计时器到期时, keyPreservedState.clear(); 仅清除上下文中的元素,而不清除整个状态.我们现在正在处理不断增加的检查点大小,但这是另一个问题...

Setting the timer every time we call keyPreservedState.write(true); was enough. When the timer expires keyPreservedState.clear(); only clears the element in the contexts, not the whole state. We are now dealing with constantly increasing checkpointing size but this is another issue...

这篇关于Apache Beam:状态规范中的TTL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆