Apache Flink:ProcessWindowFunction KeyBy() 多个值 [英] Apache Flink: ProcessWindowFunction KeyBy() multiple values

查看:58
本文介绍了Apache Flink:ProcessWindowFunction KeyBy() 多个值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 WindowFunction 与 DataStream 一起使用,我的目标是获得如下所示的查询

I'm trying to use WindowFunction with DataStream, my goal is to have a Query like the following

SELECT  *,
    count(id) OVER(PARTITION BY country) AS c_country,
    count(id) OVER(PARTITION BY city) AS c_city,
    count(id) OVER(PARTITION BY city) AS c_addrs
FROM fm
ORDER BY country

帮助我按国家字段聚合,但我需要在同一时间窗口按两个字段进行聚合.对于这种情况,我不知道 keyBy( ) 中是否可以有两个或多个键

have helped me for the aggregation by the country field, but I need to do the aggregation by two fields in the same time window. I don't know if it is possible to have two or more keys in keyBy( ) for this case

val parsed = stream2.map(x=> {
      val arr = x.split(",")
      (arr(0).toInt, arr(1), arr(2))
    })


    parsed
    .keyBy(x => x._2) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
      }
      }).print().setParallelism(1)

这对于第一次聚合非常有用,但我错过了同一时间窗口中城市字段的第二次聚合.

This is great for the first aggregation, but I'm missing the second aggregation for the city field in the same time window.

输入数据:

10,"SPAIN","BARCELONA","C1"
20,"SPAIN","BARCELONA","C2"
30,"SPAIN","MADRID","C3"
30,"SPAIN","MADRID","C3"
80,"SPAIN","MADRID","C4"
90,"SPAIN","VALENCIA","C5"
40,"ITALY","ROMA","C6"
41,"ITALY","ROMA","C7"
42,"ITALY","VENECIA","C8"
50,"FRANCE","PARIS","C9"
60,"FRANCE","PARIS","C9"
70,"FRANCE","MARSELLA","C10"

预期输出

(10,"SPAIN","BARCELONA",6,2,1)
(20,"SPAIN","BARCELONA",6,2,1)
(30,"SPAIN","MADRID",6,3,2)
(30,"SPAIN","MADRID",6,3,2)
(80,"SPAIN","MADRID",6,3,1)
(90,"SPAIN","VALENCIA",6,1,1)
(50,"FRANCE","PARIS",3,2,1)
(60,"FRANCE","PARIS",3,2,1)
(70,"FRANCE","MARSELLA",3,1,1)
(40,"ITALY","ROMA",3,2,2)
(41,"ITALY","ROMA",3,2,2)
(42,"ITALY","VENECIA",3,1,1)

---------------- 更新 2 ------------------

---------------- UPDATE 2 ------------------

我目前想要对 3 列进行聚合.如果我使用的选项是链接 KeyBy() 输出,但这会变得非常长和复杂,并且不太可读.除此之外,我放置了一个 Time.seconds(1) 时间窗口,因为没有这个窗口,上面的 KeyBy() 输出将作为单独的事件.

I currently want to do the aggregation for 3 columns. If the option I'm using is to chain the KeyBy() output, but this can become very long and complex, and not very readable. In addition to this, I put a time window of Time.seconds(1) because without this window the KeyBy() output above takes as individual events.

我的兴趣是能否在单个流程函数中进行这些聚合.

my interest is if I can make these aggregations in a single process function.

我有那么长的代码...

I have the code that long...

parsed
    .keyBy(_.country) // key by product id.
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        AlarmasIn, AlarmasOut, String, TimeWindow
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[AlarmasIn],
                             out: Collector[AlarmasOut]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address, lst.size,0,0)))
      }
      })
      .keyBy( _.city).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
        .process(new ProcessWindowFunction[
          AlarmasOut, AlarmasOut, String, TimeWindow
        ]() {
          override def process(key: String,
                               context: Context,
                               elements: Iterable[AlarmasOut],
                               out: Collector[AlarmasOut]): Unit = {
            val lst = elements.toList
            lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,lst.size,x.c_addr)))
          }
        })
      .keyBy( _.address).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
      .process(new ProcessWindowFunction[
        AlarmasOut, AlarmasOut, String, TimeWindow
      ]() {
        override def process(key: String,
                             context: Context,
                             elements: Iterable[AlarmasOut],
                             out: Collector[AlarmasOut]): Unit = {
          val lst = elements.toList
          lst.foreach(x => out.collect(AlarmasOut(x.id, x.country, x.city,x.address,x.c_country,x.c_city,lst.size)))
        }
      })
      .print()

/// CASE CLASS
 case class AlarmasIn(
                      id: Int,
                      country: String,
                      city: String,
                      address: String
                    )

  case class AlarmasOut(
                       id: Int,
                       country: String,
                       city: String,
                       address: String,
                       c_country: Int,
                       c_city: Int,
                       c_addr: Int
                     )

推荐答案

由于 citycountry 的子类别,您可以按 city<聚合流先/code> 维度,然后再按 country 维度进行聚合.

As city is a subcategory of country, you can aggregate the stream by city dimension first, then do another aggregation by country dimension.

val parsed = stream2.map(x=> {
      val arr = x.split(",")
      (arr(0).toInt, arr(1), arr(2))
    })


    parsed
    .keyBy(x => x._3) 
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val lst = elements.toList
          lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
      }
      })
      .keyBy(x => x._2)
      .process(new ProcessWindowFunction[
        (Int, String, String), (Int, String, String, Int), String, TimeWindow   
      ]() {
        override def process(key: String, context: Context,
                             elements: Iterable[(Int, String, String)],
                             out: Collector[(Int, String, String, Int)]): Unit = {  
          val cnt = 0
          for(e:elements){
             cnt += e._4
          }

          lst.foreach(x => out.collect((x._1, x._2, x._3, cnt)))
      }
      }).print().setParallelism(1)

如果一个维度不是另一个维度的子维度,你可以把这两个维度连接起来,生成一个新的key,然后自己在process func中实现聚合逻辑.

If one dimension is not a sub-dimension of the other dim, you can concat these 2 dims and generete a new key, then implement the aggregation logic in process func by yourself.

keyBy(x=>x._2+x._3)

更新

我认为在一个过程函数中计算结果是不可能的,因为您正在尝试使用不同的键进行统计.一步完成的唯一方法是将全局并行度设置为 1(即使您使用 keyby func,所有输入数据都将转到一个下游任务)或将输入数据广播到所有下游任务.

UPDATE

I think it is not possible to calculate the result in one process function cuz you are trying to do the statistics with different keys. The only way to do it in one step is that you set the global parallelism to 1(all input data will go to one downstream task even you use a keyby func) or broadcast the input data to all downstream tasks.

因为你的计算实际上有一些通用的流程逻辑,所以最好做一些抽象.

Since your calculation actually have some common process logic, it would be better to do some abstraction.

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object CountJob {

  @throws[Exception]
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val transactions: DataStream[Record] = env
      .addSource(new SourceFunction[Record] {
        override def run(sourceContext: SourceFunction.SourceContext[Record]): Unit = {
          while (true) {
            sourceContext.collect(Record(1, "a", "b", "c", 1, 1, 1))
            Thread.sleep(1000)
          }
        }

        override def cancel(): Unit = {

        }
      })
      .name("generate source")

    transactions.keyBy(_.addr)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("ADDR"))
      .keyBy(_.city)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("CITY"))
      .keyBy(_.country)
      .timeWindow(Time.seconds(1))
      .process(new CustomCountProc("COUNTRY"))
      .print()


    env.execute("Count Job")
  }
}

// a common operator to process different aggregation
class CustomCountProc(aggrType: String) extends ProcessWindowFunction[Record, Record, String, TimeWindow] {

  override def process(key: String, context: Context, elements: Iterable[Record], out: Collector[Record]): Unit = {

    for (e <- elements) {
      if ("ADDR".equals(aggrType)) {
        out.collect(Record(-1, e.country, e.city, key, e.country_cnt, e.city_cnt, elements.size))
      }
      else if ("CITY".equals(aggrType)) {
        out.collect(Record(-1, e.country, key, e.country, e.country_cnt, elements.size, e.addr_cnt))
      }
      else if ("COUNTRY".equals(aggrType)) {
        out.collect(Record(-1, key, e.city, e.addr, elements.size, e.city_cnt, e.addr_cnt))
      }
    }

  }
}

case class Record(
                   id: Int,
                   country: String,
                   city: String,
                   addr: String,
                   country_cnt: Int,
                   city_cnt: Int,
                   addr_cnt: Int
                 ) {
}

顺便说一句,我不确定输出是否真的符合您的期望.由于您没有实现有状态的流程功能,我认为您正在尝试计算每批数据的聚合结果,并且每批都包含在一秒的时间窗口内摄取的数据.输出不会一直累积,每批将从零开始.

Btw, I am not sure if the output actually meets your expectation. As you didn't implement a stateful process function, I think you are trying to calculate the aggregation results for each batch of data, and each batch contains data ingested in a time window of one second. The output won't accumulate all the time, each batch will start from zero.

通过使用timeWindow 函数,您还需要注意TimeCharacteristic,默认为处理时间.

By using timeWindow function, you also need to notice the TimeCharacteristic which by default is the processing time.

由于使用了 3 个后续的 window 函数,输出也可能会延迟.假设第一个进程func在一秒内完成了聚合并将结果转发到下游.由于第二个进程 func 也有一个 1 秒的 timewindow,它不会发出任何结果,直到它收到来自上游的下一批输出.

The output may also delay because of using 3 consequent window function. Suppose the first process func has completed the aggregation within one second and forward the results downstream. As the second process func also has a timewindow of 1 second, it won't emit any result until it receives next batch of output from upstream.

让我们看看其他人是否有更好的解决方案.

Let's see if others have better solution to your problem.

这篇关于Apache Flink:ProcessWindowFunction KeyBy() 多个值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆