如何获取写入的记录数(使用DataFrameWriter的保存操作)? [英] How to get the number of records written (using DataFrameWriter's save operation)?

查看:464
本文介绍了如何获取写入的记录数(使用DataFrameWriter的保存操作)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用spark保存记录时,是否有任何方法可以获取写入的记录数?虽然我知道它目前不在规范中,但我希望能够执行以下操作:

Is there any way to get the number of records written when using spark to save records? While I know it isn't in the spec currently, I'd like to be able to do something like:

val count = df.write.csv(path)

或者,能够对步骤的结果进行内联计数(最好不使用标准累加器)(几乎)是有效的.即:

Alternatively, being able to do an inline count (preferably without just using a standard accumulator) of the results of a step would be (almost) as effective. i.e.:

dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()

有什么想法吗?

推荐答案

我将使用 SparkListener 可以拦截可用于访问任务指标的onTaskEndonStageCompleted事件.

任务指标为您提供Spark用来在SQL选项卡(在查询的详细信息"中)中显示指标的累加器.

Task metrics give you the accumulators Spark uses to display metrics in SQL tab (in Details for Query).

例如,以下查询:

spark.
  read.
  option("header", true).
  csv("../datasets/people.csv").
  limit(10).
  write.
  csv("people")

准确地给出10条输出行,这样Spark就能知道(并且您也可以).

gives exactly 10 output rows so Spark knows it (and you could too).

您还可以探索Spark SQL的 QueryExecutionListener :

You could also explore Spark SQL's QueryExecutionListener:

查询执行侦听器的接口,可用于分析执行指标.

The interface of query execution listener that can be used to analyze execution metrics.

您可以使用QueryExecutionListener rel ="nofollow noreferrer"> ExecutionListenerManager ,可以作为spark.listenerManager来使用.

You can register a QueryExecutionListener using ExecutionListenerManager that's available as spark.listenerManager.

scala> :type spark.listenerManager
org.apache.spark.sql.util.ExecutionListenerManager

scala> spark.listenerManager.
clear   clone   register   unregister

我认为它更接近裸机",但以前从未使用过.

I think it's closer to the "bare metal", but haven't used that before.

@ D3V (在评论部分)提到使用结构化文件的QueryExecution访问numOutputRows SQL指标.询问.值得考虑的事情.

@D3V (in the comments section) mentioned accessing the numOutputRows SQL metrics using QueryExecution of a structured query. Something worth considering.

scala> :type q
org.apache.spark.sql.DataFrame

scala> :type q.queryExecution.executedPlan.metrics
Map[String,org.apache.spark.sql.execution.metric.SQLMetric]

q.queryExecution.executedPlan.metrics("numOutputRows").value

这篇关于如何获取写入的记录数(使用DataFrameWriter的保存操作)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆