带有自定义接收器的 spark 结构化流中的输入行数 [英] Number of input rows in spark structured streaming with custom sink

查看:30
本文介绍了带有自定义接收器的 spark 结构化流中的输入行数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在结构化流 (spark 2.2.0) 中使用自定义接收器,并注意到 spark 生成的输入行数指标不正确 - 它始终为零.

I'm using a custom sink in structured stream (spark 2.2.0) and noticed that spark produces incorrect metrics for number of input rows - it's always zero.

我的流构建:

StreamingQuery writeStream = session
            .readStream()
            .schema(RecordSchema.fromClass(TestRecord.class))
            .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
            .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
            .csv(s3Path.toString())
            .as(Encoders.bean(TestRecord.class))
            .flatMap(
                ((FlatMapFunction<TestRecord, TestOutputRecord>) (u) -> {
                    List<TestOutputRecord> list = new ArrayList<>();
                    try {
                        TestOutputRecord result = transformer.convert(u);
                        list.add(result);
                    } catch (Throwable t) {
                        System.err.println("Failed to convert a record");
                        t.printStackTrace();
                    }

                    return list.iterator();
                }),
                Encoders.bean(TestOutputRecord.class))
        .map(new DataReinforcementMapFunction<>(), Encoders.bean(TestOutputRecord.clazz))
        .writeStream()
        .trigger(Trigger.ProcessingTime(WRITE_FREQUENCY, TimeUnit.SECONDS))
        .format(MY_WRITER_FORMAT)
        .outputMode(OutputMode.Append())
        .queryName("custom-sink-stream")
        .start();

        writeStream.processAllAvailable();
        writeStream.stop();

日志:

Streaming query made progress: {
  "id" : "a8a7fbc2-0f06-4197-a99a-114abae24964",
  "runId" : "bebc8a0c-d3b2-4fd6-8710-78223a88edc7",
  "name" : "custom-sink-stream",
  "timestamp" : "2018-01-25T18:39:52.949Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 781,
    "triggerExecution" : 781
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[s3n://test-bucket/test]",
    "startOffset" : {
      "logOffset" : 0
    },
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "com.mycompany.spark.MySink@f82a99"
  }
}

我是否必须在自定义接收器中填充任何指标才能跟踪进度?或者,当它从 s3 存储桶读取时,它可能是 FileStreamSource 中的问题?

Do I have to populate any metrics in my custom sink to be able to track progress? Or could it be a problem in FileStreamSource when it reads from s3 bucket?

推荐答案

问题与在我的自定义接收器中使用 dataset.rdd 相关,该接收器创建了一个新计划,因此 StreamExecution 不知道因此无法获取指标.

The problem was related to using dataset.rdd in my custom sink that creates a new plan so that StreamExecution doesn't know about it and therefore is not able to get metrics.

data.queryExecution.toRdd.mapPartitions 替换 data.rdd.mapPartitions 解决了这个问题.

Replacing data.rdd.mapPartitions with data.queryExecution.toRdd.mapPartitions fixes the issue.

这篇关于带有自定义接收器的 spark 结构化流中的输入行数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆