Apache Flink:运行许多作业时的性能问题 [英] Apache Flink: Performance issue when running many jobs

查看:119
本文介绍了Apache Flink:运行许多作业时的性能问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果Flink SQL查询数量众多(以下100个),则Flink命令行客户端将在Yarn群集上失败,并显示"JobManager在600000毫秒内未响应",即该作业从未在该群集上启动.

With a high number of Flink SQL queries (100 of below), the Flink command line client fails with a "JobManager did not respond within 600000 ms" on a Yarn cluster, i.e. the job is never started on the cluster.

  • 在最后一个TaskManager启动之后,JobManager日志什么都没有,除了 调试日志与"ID为5cd95f89ed7a66ec44f2d19eca0592f7的作业不 在JobManager中找到",表明其可能卡住了(创建 ExecutionGraph?).
  • 与本地独立Java程序相同 (最初是高CPU)
  • 注意:structStream中的每一行都包含515 列(许多最终为空),包括具有原始列的列 信息.
  • 在YARN群集中,我们为TaskManager指定18GB,为18GB 对于JobManager,每个5个插槽,并行度为725(分区 在我们的Kafka资料中).
  • JobManager logs has nothing after the last TaskManager started except DEBUG logs with "job with ID 5cd95f89ed7a66ec44f2d19eca0592f7 not found in JobManager", indicating its likely stuck (creating the ExecutionGraph?).
  • The same works as standalone java program locally (high CPU initially)
  • Note: Each Row in structStream contains 515 columns (many end up null) including a column that has the raw message.
  • In the YARN cluster we specify 18GB for TaskManager, 18GB for the JobManager, 5 slots each and parallelism of 725 (partitions in our Kafka source).
select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, 
       EventTimestamp, RawMsg, Source 
from structStream
where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' 
      and Outcome='Success'
group by tumble(proctime, INTERVAL '1' SECOND), Environment, 
         CollectedTimestamp, EventTimestamp, RawMsg, Source

代码

public static void main(String[] args) throws Exception {
    FileSystems.newFileSystem(KafkaReadingStreamingJob.class
                             .getResource(WHITELIST_CSV).toURI(), new HashMap<>());

    final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment();
    final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment);

    final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment);
    tableEnv.registerDataStream("structStream", structStream);
    tableEnv.scan("structStream").printSchema();

    for (int i = 0; i < 100; i++) {
        for (String query : Queries.sample) {
            // Queries.sample has one query that is above. 
            Table selectQuery = tableEnv.sqlQuery(query);

            DataStream<Row> selectQueryStream =                                                 
                               tableEnv.toAppendStream(selectQuery, Row.class);
            selectQueryStream.print();
        }
    }

    // execute program
    streamingEnvironment.execute("Kafka Streaming SQL");
}

private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
    Properties properties = getKafkaProperties();

    // TestDeserializer deserializes the JSON to a ROW of string columns (515)
    // and also adds a column for the raw message. 
    FlinkKafkaConsumer011 consumer = new         
         FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties);
    DataStream<Row> stream = environment.addSource(consumer);

    return stream;
}

private static RowTypeInfo getRowTypeInfo() throws Exception {
    // This has 515 fields. 
    List<String> fieldNames = DDIManager.getDDIFieldNames();
    fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
    fieldNames.add("proctime");

    // Fill typeInformationArray with StringType to all but the last field which is of type Time
    .....
    return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}

private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
    final StreamExecutionEnvironment env =                      
    StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

    env.enableCheckpointing(60000);
    env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
    env.setParallelism(725);
    return env;
}

private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
    Properties properties = getKafkaProperties();

    // TestDeserializer deserializes the JSON to a ROW of string columns (515)
    // and also adds a column for the raw message. 
    FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new  TestDeserializer(getRowTypeInfo()), properties);
    DataStream<Row> stream = environment.addSource(consumer);

    return stream;
}

private static RowTypeInfo getRowTypeInfo() throws Exception {
    // This has 515 fields. 
    List<String> fieldNames = DDIManager.getDDIFieldNames();
    fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
    fieldNames.add("proctime");

    // Fill typeInformationArray with StringType to all but the last field which is of type Time
    .....
    return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}

private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
    final StreamExecutionEnvironment env =     StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

    env.enableCheckpointing(60000);
    env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
    env.setParallelism(725);
    return env;
}

推荐答案

在我看来,这似乎是JobManager重载了太多正在同时运行的作业.我建议将作业分配给更多的JobManagers/Flink群集.

This looks to me as if the JobManager is overloaded with too many concurrently running jobs. I'd suggest to distribute the jobs to more JobManagers / Flink clusters.

这篇关于Apache Flink:运行许多作业时的性能问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆