如何在Apache Flink中加入两个流? [英] How do I join two streams in apache flink?

查看:101
本文介绍了如何在Apache Flink中加入两个流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开始使用flink,并查看官方教程之一.

I am getting started with flink and having a look at one of the official tutorials.

据我所知,本练习的目的是将time属性的两个流合并在一起.

To my understanding the goal of this exercise is to join the two streams on the time attribute.

任务:

此练习的结果是Tuple2记录的数据流,每个不同的rideId一个.您应该忽略 END个活动,并且只能在每次骑行的START时加入该活动 其相应的票价数据.

The result of this exercise is a data stream of Tuple2 records, one for each distinct rideId. You should ignore the END events, and only join the event for the START of each ride with its corresponding fare data.

结果流应打印为标准输出.

The resulting stream should be printed to standard out.

问题:EnrichmentFunction如何又可以将两个流合并在一起.它怎么知道参加哪个游乐项目的公平?我希望它可以缓冲多个博览会/竞赛,直到传入的博览会/竞赛有匹配的合作伙伴为止.

Question: How is the EnrichmentFunction able to join the two streams aka. how does it know which fair to join with which ride? I expected it to buffer multiple fairs/rides until for an incoming fair/ride there is a matching partner.

据我所知,它只会保存它看到的每个游乐设施/展览,并将其与下一个最佳游乐设施/展览会结合起来.为什么这是适当的联接?

In my understanding it just saves every ride/fair it sees and combines it with the next best ride/fair. Why is this a proper join?

提供的解决方案:

/*
 * Copyright 2017 data Artisans GmbH
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.dataartisans.flinktraining.solutions.datastream_java.state;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

/**
 * Java reference implementation for the "Stateful Enrichment" exercise of the Flink training
 * (http://training.data-artisans.com).
 *
 * The goal for this exercise is to enrich TaxiRides with fare information.
 *
 * Parameters:
 * -rides path-to-input-file
 * -fares path-to-input-file
 *
 */
public class RidesAndFaresSolution extends ExerciseBase {
    public static void main(String[] args) throws Exception {

        ParameterTool params = ParameterTool.fromArgs(args);
        final String ridesFile = params.get("rides", pathToRideData);
        final String faresFile = params.get("fares", pathToFareData);

        final int delay = 60;                   // at most 60 seconds of delay
        final int servingSpeedFactor = 1800;    // 30 minutes worth of events are served every second

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(ExerciseBase.parallelism);

        DataStream<TaxiRide> rides = env
                .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
                .filter((TaxiRide ride) -> ride.isStart)
                .keyBy("rideId");

        DataStream<TaxiFare> fares = env
                .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
                .keyBy("rideId");

        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
                .connect(fares)
                .flatMap(new EnrichmentFunction());

        printOrTest(enrichedRides);

        env.execute("Join Rides with Fares (java RichCoFlatMap)");
    }

    public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
        // keyed, managed state
        private ValueState<TaxiRide> rideState;
        private ValueState<TaxiFare> fareState;

        @Override
        public void open(Configuration config) {
            rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
            fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
        }

        @Override
        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiFare fare = fareState.value();
            if (fare != null) {
                fareState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                rideState.update(ride);
            }
        }

        @Override
        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiRide ride = rideState.value();
            if (ride != null) {
                rideState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                fareState.update(fare);
            }
        }
    }
}

推荐答案

在此特定关于状态丰富的培训,对于rideId的每个值都有三个事件-TaxiRide开始事件,TaxiRide结束事件和TaxiFare.本练习的目的是将每个TaxiRide启动事件与具有相同rideId的一个TaxiFare事件相关联-换句话说,在rideId上加入乘车流和票价流,同时知道每个将只有一个.

In the context of this particular training exercise on stateful enrichment, there are three events for each value of rideId -- a TaxiRide start event, a TaxiRide end event, and a TaxiFare. The objective of this exercise is to connect each TaxiRide start event with the one TaxiFare event having the same rideId -- or in other words, to join the ride stream and fare stream on rideId, while knowing that there will be only one of each.

此练习演示了Flink中的键控状态如何工作.键控状态实际上是分片键值存储.当我们有一个ValueState项,例如ValueState<TaxiRide> rideState时,Flink将在其状态后端为键的每个不同值(rideId)存储一个单独的记录.

This exercise is demonstrating how keyed state works in Flink. Keyed state is effectively a sharded key-value store. When we have an item of ValueState, such as ValueState<TaxiRide> rideState, Flink will store a separate record in its state backend for each distinct value of the key (the rideId).

每次调用flatMap1flatMap2时,在上下文中都隐含一个键(a rideId),当我们调用rideState.update(ride)rideState.value()时,我们没有访问单个变量,而是进行设置并使用rideId作为键在键值存储中获取一个条目.

Each time flatMap1 and flatMap2 are called there is a key (a rideId) implicitly in context, and when we call rideState.update(ride) or rideState.value() we are not accessing a single variable, but rather setting and getting an entry in a key-value store, using the rideId as the key.

在本练习中,两个流都由rideId键控,因此每个不同的rideId可能都有一个rideState元素和一个fareState元素.因此,提供的解决方案是缓冲大量的乘车和票价,但是每个rideId只能缓冲一个(这足以,因为该数据集中的乘车和票价是完美配对的).

In this exercise, both streams are keyed by the rideId, so there is potentially one element of rideState and one element of fareState for each distinct rideId. Hence the solution that's been provided is buffering lots of rides and fares, but only one for each rideId (which is enough, given that the rides and fares are perfectly paired in this dataset).

所以,你问:

EnrichmentFunction如何能够又将两个流加入.它怎么知道哪种乘车票价?

How is the EnrichmentFunction able to join the two streams aka. how does it know which fare to join with which ride?

答案是

它加入具有相同rideId的票价.

您询问过的这个特定练习显示了如何实现简单的扩充连接,以了解键控状态和连接的流的思想.但是使用Flink当然可以进行更复杂的连接.请参阅使用DataStream加入的文档API 加入Flink的Table API

This particular exercise you've asked about shows how to implement a simple enrichment join for the purpose of getting across the ideas of keyed state, and connected streams. But more complex joins are certainly possible with Flink. See the docs on joins using the DataStream API, joins with Flink's Table API, and joins with Flink SQL.

这篇关于如何在Apache Flink中加入两个流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆