Flink:在CoFlatMapFunction中共享状态 [英] Flink: Sharing state in CoFlatMapFunction

查看:780
本文介绍了Flink:在CoFlatMapFunction中共享状态的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

CoFlatMapFunction有点卡住了.如果将其放在窗口之前的DataStream上似乎可以正常工作,但是如果将其放在窗口的应用"功能之后会失败.

Got stuck a bit with CoFlatMapFunction. It seems to work fine if I place it on the DataStream before window but fails if placed after window's "apply" function.

我正在测试两个流,flatMap1上的主要功能"不断摄取数据,而控制flatMap2上的模型"流则根据需要更改模型.

I was testing two streams, main "Features" on flatMap1 constantly ingesting data and control stream "Model" on flatMap2 changing the model on request.

我能够设置并看到在flatMap2中正确设置的b0/b1,但是flatMap1总是看到b0和b1如同在初始化时设置为0.

I am able to set and see b0/b1 properly set in flatMap2, but flatMap1 always see b0 and b1 as was set to 0 at the initialization.

我在这里错过了明显的东西吗?

Am I missing something obvious here?

public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}

推荐答案

这是邮件列表中的答案...

Here is the answer from the mailing list...

CoFlatMapFunction是否打算并行执行?

Is the CoFlatMapFunction intended to be executed in parallel?

如果是,则需要确定确定分配哪个记录的方法 转到哪个并行实例.以某种方式CoFlatMapFunction 在模型和结果之间进行并行(分区)连接 会话窗口,因此您需要某种形式的密钥来选择 对元素进行分区.这有道理吗?

If yes, you need some way to deterministically assign which record goes to which parallel instance. In some way the CoFlatMapFunction does a parallel (partitions) join between the model and the result of the session windows, so you need some form of key that selects which partition the elements go to. Does that make sense?

如果没有,请尝试将其明确设置为并行度1.

If not, try to set it to parallelism 1 explicitly.

斯蒂芬的问候

通过以下命令可以使所有人都可以访问只读状态的全局状态 broadcast().

A global state that all can access read-only is doable via broadcast().

可供所有人读取和更新的全局状态是 当前不可用.一致的操作将是相当 成本很高,需要某种形式的分布式通信/共识.

A global state that is available to all for read and update is currently not available. Consistent operations on that would be quite costly, require some form of distributed communication/consensus.

相反,我鼓励您使用以下内容:

Instead, I would encourage you to go with the following:

1)如果可以对状态进行分区,请使用keyBy().mapWithState()- 本地化状态操作并使其非常快.

1) If you can partition the state, use a keyBy().mapWithState() - That localizes state operations and makes it very fast.

2)如果您的状态不是按键组织的,则您的状态可能非常 很小,您也许可以使用非并行操作.

2) If your state is not organized by key, your state is probably very small, and you may be able to use a non-parallel operation.

3)如果某项操作更新了状态,而另一项访问了该状态, 您通常可以通过迭代和CoFlatMapFunction来实现 (一侧是原始输入,另一侧是反馈输入).

3) If some operation updates the state and another one accesses it, you can often implement that with iterations and a CoFlatMapFunction (one side is the original input, the other the feedback input).

最终所有方法都将状态访问和修改本地化, 如果可能的话,这是一个很好的遵循模式.

All approaches in the end localize state access and modifications, which is a good pattern to follow, if possible.

斯蒂芬的问候

这篇关于Flink:在CoFlatMapFunction中共享状态的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆