Apache Flink:KeyedStream 上的倾斜数据分布 [英] Apache Flink: Skewed data distribution on KeyedStream

查看:22
本文介绍了Apache Flink:KeyedStream 上的倾斜数据分布的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Flink 中有这个 Java 代码:

I have this Java code in Flink:

env.setParallelism(6);

//Read from Kafka topic with 12 partitions
DataStream<String> line = env.addSource(myConsumer);

//Filter half of the records 
DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
DataStream<Tuple3<String, String, Integer>> line_Num_Odd_2 = line_Num_Odd.map(new OddAdder());

//Filter the other half
DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
DataStream<Tuple3<String, String, Integer>> line_Num_Even_2 = line_Num_Even.map(new EvenAdder());

//Join all the data again
DataStream<Tuple3<String, String, Integer>> line_Num_U = line_Num_Odd_2.union(line_Num_Even_2);

//Window
DataStream<Tuple3<String, String, Integer>> windowedLine_Num_U_K = line_Num_U
                .keyBy(1)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .reduce(new Reducer());

问题在于窗口应该能够以并行度 = 2 进行处理,因为在 Tuple3 的第二个字符串中有两组不同的数据,其键为odd"和even".一切都以并行度 6 运行,但不是以并行度 = 1 运行的窗口,由于我的要求,我只需要它具有并行度 = 2.

The problem is that the window should be able to process with parallelism = 2 as there are two diferent groups of data with keys "odd" and "even" in the second String in the Tuple3. Everything is running with parallelism 6 but not the window which is running with parallelism = 1 and I just need it to have parallelism = 2 because of my requirements.

代码中用到的函数如下:

The functions used in the code are the following:

public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {

    public boolean filter(Tuple2<String, Integer> line) throws Exception {
        Boolean isOdd = (Long.valueOf(line.f0.split(" ")[0]) % 2) != 0;
        return isOdd;
    }
};


public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {

    public boolean filter(Tuple2<String, Integer> line) throws Exception {
        Boolean isEven = (Long.valueOf(line.f0.split(" ")[0]) % 2) == 0;
        return isEven;
    }
};

public static class OddAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {

    public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
        Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "odd", line.f1);
        return newLine;
    }
};


public static class EvenAdder implements MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {

    public Tuple3<String, String, Integer> map(Tuple2<String, Integer> line) throws Exception {
        Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(line.f0, "even", line.f1);
        return newLine;
    }
};

public static class Reducer implements ReduceFunction<Tuple3<String, String, Integer>> {

    public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> line1,
            Tuple3<String, String, Integer> line2) throws Exception {
        Long sum = Long.valueOf(line1.f0.split(" ")[0]) + Long.valueOf(line2.f0.split(" ")[0]);
        Long sumTS = Long.valueOf(line1.f0.split(" ")[1]) + Long.valueOf(line2.f0.split(" ")[1]);
        Tuple3<String, String, Integer> newLine = new Tuple3<String, String, Integer>(String.valueOf(sum) +
                " " + String.valueOf(sumTS), line1.f1, line1.f2 + line2.f2);
        return newLine;
    }
};

感谢您的帮助!

解决方案:我已将密钥的内容从odd"和even"更改为odd0000"和even1111",现在可以正常工作了.

推荐答案

键通过散列分区分配给工作线程.这意味着键值被散列并且线程由模#workers 确定.对于两个键和两个线程,很有可能将两个键分配给同一个线程.

Keys are distributed to worker threads by hash partitioning. This means that the key values are hashed and the thread is determined by modulo #workers. With two keys and two threads there is a good chance that both keys are assigned to the same thread.

您可以尝试使用散列值分布在两个线程中的不同键值.

You can try to use different key values whose hash values distribute across both threads.

这篇关于Apache Flink:KeyedStream 上的倾斜数据分布的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆