如何在Flink中支持多个KeyBy [英] How to support multiple KeyBy in Flink

查看:2246
本文介绍了如何在Flink中支持多个KeyBy的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在下面的代码示例中,我试图获取一组员工记录{国家,雇主,姓名,工资,年龄},并倾销每个国家中薪水最高的员工.不幸的是,多个KEY BY无效.

In code sample below, I am trying to get a stream of employee records {Country, Employer, Name, Salary, Age } and dumping highest paid employee in every country. Unfortunately Multiple KEY By doesn't work.

仅KeyBy(Employer)正在反映,因此我没有得到正确的结果. 我想念什么?

Only KeyBy(Employer) is reflecting, thus I don't get correct result. What am I missing?

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Employee> streamEmployee = env.addSource(
            new FlinkKafkaConsumer010<ObjectNode>("flink-demo", new JSONDeserializationSchema(), properties))
            .map(new MapFunction<ObjectNode, Employee>() {

                private static final long serialVersionUID = 6111226274068863916L;

                @Override
                public Employee map(ObjectNode value) throws Exception {
                    final Gson gson = new GsonBuilder().create();
                    Employee uMsg = gson.fromJson(value.toString(), Employee.class);
                    return uMsg;
                }
            });

    KeyedStream<Employee, String> employeesKeyedByCountryndEmployer = streamEmployee
            .keyBy(new KeySelector<Employee, String>() {
                private static final long serialVersionUID = -6867736771747690202L;

                @Override
                public String getKey(Employee value) throws Exception {
                    // TODO Auto-generated method stub
                    return value.getCountry();
                }
            }).keyBy(new KeySelector<Employee, String>() {
                private static final long serialVersionUID = -6867736771747690202L;

                @Override
                public String getKey(Employee value) throws Exception {
                    // TODO Auto-generated method stub
                    return value.getEmployer();
                }
            });
    // This should display employees highly paid in a given country , for a
    // given employer
    DataStream<Employee> uHighlyPaidEmployee = employeesKeyedByCountryndEmployer.timeWindow(Time.seconds(5))
            .maxBy("salary");

    // Assume toString() is overridden , so print works well.
    uHighlyPaidEmployee.print();

    env.execute("Employee-employer log processor");

推荐答案

您可以定义返回组合键的KeySelector:

You can define a KeySelector that returns a composite key:

KeyedStream<Employee, Tuple2<String, String>> employeesKeyedByCountryndEmployer = 
  streamEmployee.keyBy(
    new KeySelector<Employee, Tuple2<String, String>>() {

      @Override
      public Tuple2<String, String> getKey(Employee value) throws Exception {
        return Tuple2.of(value.getCountry(), value.getEmployer());
      }
    }
  );

这篇关于如何在Flink中支持多个KeyBy的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆