Apache Flink的吞吐量和延迟 [英] Throughput and Latency on Apache Flink

查看:1530
本文介绍了Apache Flink的吞吐量和延迟的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我为Apache Flink编写了一个非常简单的Java程序,现在我对测量统计量感兴趣,例如吞吐量(每秒处理的元组数)和等待时间(程序需要处理每个输入元组的时间).

I have written a very simple java program for Apache Flink and now I am interested in measuring statistics such as throughput (number of tuples processed per second) and latency (the time the program needs to process every input tuple).

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
        .map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv");

JobExecutionResult res = env.execute();

我知道Flink公开了一些指标:

I know that Flink exposes some metrics:

https://ci.apache .org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html

但是我不确定如何使用它们来获取我想要的东西.从链接中,我已经了解到可以使用仪表"来测量平均吞吐量,但是在定义后,我应该如何使用它?

But I am not sure how to use them in order to obtain what I want. From the link I have read that a "meter" can be used to measure the average throughput but, after having defined it, how should I use it?

推荐答案

我们正在运行在纱线上运行的生产流作业中的自定义指标(例如仪表,仪表).

We are running custom metrics like meter, gauge in our production streaming job running on yarn .

以下是步骤:

pom.xml的其他依赖项

Additional dependency to pom.xml

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-metrics-dropwizard</artifactId>
    <version>${flink.version}</version>
</dependency>

我们正在使用1.2.1版

We are using version 1.2.1

然后将仪表添加到MyMapper类.

Then add meter to MyMapper class .

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Meter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class Test {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
                .readTextFile("/home/LizardKing/Documents/Power/Prova.csv")
                .map(new MyMapper())
                .writeAsCsv("/home/LizardKing/Results.csv");

        JobExecutionResult res = env.execute();
    }


    private static class MyMapper extends RichMapFunction<String, Object> {

        private transient Meter meter;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            this.meter = getRuntimeContext()
                    .getMetricGroup()
                    .meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
        }

        @Override
        public Object map(String value) throws Exception {    
            this.meter.markEvent();
            return value;
        }
    }
}

希望这会有所帮助.

这篇关于Apache Flink的吞吐量和延迟的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆