如何在 apache flink 中加入两个流? [英] How do I join two streams in apache flink?

查看:26
本文介绍了如何在 apache flink 中加入两个流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开始使用 flink 并查看 官方教程之一.

I am getting started with flink and having a look at one of the official tutorials.

据我所知,这个练习的目标是在时间属性上连接两个流.

To my understanding the goal of this exercise is to join the two streams on the time attribute.

任务:

这个练习的结果是一个 Tuple2 记录的数据流,每个不同的rideId 一个.你应该忽略END 事件,并且只在每次骑行的 START 加入事件其对应的票价数据.

The result of this exercise is a data stream of Tuple2 records, one for each distinct rideId. You should ignore the END events, and only join the event for the START of each ride with its corresponding fare data.

结果流应该打印到标准输出.

The resulting stream should be printed to standard out.

问题: EnrichmentFunction 如何能够加入这两个流.它怎么知道参加哪个展会?我希望它可以缓冲多个集市/游乐设施,直到即将到来的集市/游乐设施有匹配的合作伙伴为止.

Question: How is the EnrichmentFunction able to join the two streams aka. how does it know which fair to join with which ride? I expected it to buffer multiple fairs/rides until for an incoming fair/ride there is a matching partner.

据我所知,它只会保存它看到的每一次骑行/集市,并将其与下一个最佳骑行/集会相结合.为什么这是一个适当的连接?

In my understanding it just saves every ride/fair it sees and combines it with the next best ride/fair. Why is this a proper join?

提供的解决方案:

/*
 * Copyright 2017 data Artisans GmbH
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *  http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.dataartisans.flinktraining.solutions.datastream_java.state;

import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiFareSource;
import com.dataartisans.flinktraining.exercises.datastream_java.sources.TaxiRideSource;
import com.dataartisans.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.util.Collector;

/**
 * Java reference implementation for the "Stateful Enrichment" exercise of the Flink training
 * (http://training.data-artisans.com).
 *
 * The goal for this exercise is to enrich TaxiRides with fare information.
 *
 * Parameters:
 * -rides path-to-input-file
 * -fares path-to-input-file
 *
 */
public class RidesAndFaresSolution extends ExerciseBase {
    public static void main(String[] args) throws Exception {

        ParameterTool params = ParameterTool.fromArgs(args);
        final String ridesFile = params.get("rides", pathToRideData);
        final String faresFile = params.get("fares", pathToFareData);

        final int delay = 60;                   // at most 60 seconds of delay
        final int servingSpeedFactor = 1800;    // 30 minutes worth of events are served every second

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(ExerciseBase.parallelism);

        DataStream<TaxiRide> rides = env
                .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
                .filter((TaxiRide ride) -> ride.isStart)
                .keyBy("rideId");

        DataStream<TaxiFare> fares = env
                .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
                .keyBy("rideId");

        DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
                .connect(fares)
                .flatMap(new EnrichmentFunction());

        printOrTest(enrichedRides);

        env.execute("Join Rides with Fares (java RichCoFlatMap)");
    }

    public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {
        // keyed, managed state
        private ValueState<TaxiRide> rideState;
        private ValueState<TaxiFare> fareState;

        @Override
        public void open(Configuration config) {
            rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));
            fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));
        }

        @Override
        public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiFare fare = fareState.value();
            if (fare != null) {
                fareState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                rideState.update(ride);
            }
        }

        @Override
        public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {
            TaxiRide ride = rideState.value();
            if (ride != null) {
                rideState.clear();
                out.collect(new Tuple2(ride, fare));
            } else {
                fareState.update(fare);
            }
        }
    }
}

推荐答案

在这个特定的上下文中 关于状态丰富的训练练习,对于rideId 的每个值有三个事件——TaxiRide 开始事件、TaxiRide 结束事件和TaxiFare.本练习的目标是将每个 TaxiRide 开始事件与具有相同rideId 的一个 TaxiFare 事件连接起来——或者换句话说,在rideId 上加入乘车流和票价流,同时知道每个事件只有一个.

In the context of this particular training exercise on stateful enrichment, there are three events for each value of rideId -- a TaxiRide start event, a TaxiRide end event, and a TaxiFare. The objective of this exercise is to connect each TaxiRide start event with the one TaxiFare event having the same rideId -- or in other words, to join the ride stream and fare stream on rideId, while knowing that there will be only one of each.

这个练习展示了键控状态在 Flink 中是如何工作的.键控状态实际上是一个分片键值存储.当我们有一个ValueState的item,比如ValueState;rideState,Flink 将在其状态后端为键的每个不同值(rideId)存储一个单独的记录.

This exercise is demonstrating how keyed state works in Flink. Keyed state is effectively a sharded key-value store. When we have an item of ValueState, such as ValueState<TaxiRide> rideState, Flink will store a separate record in its state backend for each distinct value of the key (the rideId).

每次调用 flatMap1flatMap2 时,上下文中都会隐含一个键(一个 rideId),当我们调用 时rideState.update(ride)rideState.value() 我们不是访问单个变量,而是使用 在键值存储中设置和获取条目rideId 作为键.

Each time flatMap1 and flatMap2 are called there is a key (a rideId) implicitly in context, and when we call rideState.update(ride) or rideState.value() we are not accessing a single variable, but rather setting and getting an entry in a key-value store, using the rideId as the key.

在本练习中,两个流都由 rideId 键控,因此可能有一个 rideState 元素和一个 fareState 元素用于每个不同的 rideId.因此,提供的解决方案是缓冲大量行程和票价,但每个 rideId 只有一个(这就足够了,因为行程和票价在此数据集中完美配对).

In this exercise, both streams are keyed by the rideId, so there is potentially one element of rideState and one element of fareState for each distinct rideId. Hence the solution that's been provided is buffering lots of rides and fares, but only one for each rideId (which is enough, given that the rides and fares are perfectly paired in this dataset).

所以,你问:

EnrichmentFunction 又如何能够连接两个流.它怎么知道哪个票价加入哪个行程?

How is the EnrichmentFunction able to join the two streams aka. how does it know which fare to join with which ride?

答案是

它加入具有相同rideId的票价.

It joins the fare having the same rideId.

您询问的这个特定练习展示了如何实现简单的扩充连接,以了解键控状态和连接流的概念.但是使用 Flink 肯定可以实现更复杂的连接.请参阅有关 使用 DataStream 加入的文档API加入Flink 的 Table API加入 Flink SQL.

This particular exercise you've asked about shows how to implement a simple enrichment join for the purpose of getting across the ideas of keyed state, and connected streams. But more complex joins are certainly possible with Flink. See the docs on joins using the DataStream API, joins with Flink's Table API, and joins with Flink SQL.

这篇关于如何在 apache flink 中加入两个流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆