如何获取写入的记录数(使用DataFrameWriter的保存操作)? [英] How to get the number of records written (using DataFrameWriter's save operation)?
问题描述
使用spark保存记录时,是否有任何方法可以获取写入的记录数?虽然我知道它目前不在规范中,但我希望能够执行以下操作:
Is there any way to get the number of records written when using spark to save records? While I know it isn't in the spec currently, I'd like to be able to do something like:
val count = df.write.csv(path)
或者,能够对步骤的结果进行内联计数(最好不使用标准累加器)(几乎)是有效的.即:
Alternatively, being able to do an inline count (preferably without just using a standard accumulator) of the results of a step would be (almost) as effective. i.e.:
dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()
有什么想法吗?
推荐答案
我将使用 SparkListener
可以拦截可用于访问任务指标的onTaskEnd
或onStageCompleted
事件.
任务指标为您提供Spark用来在SQL选项卡(在查询的详细信息"中)中显示指标的累加器.
Task metrics give you the accumulators Spark uses to display metrics in SQL tab (in Details for Query).
例如,以下查询:
spark.
read.
option("header", true).
csv("../datasets/people.csv").
limit(10).
write.
csv("people")
准确地给出10条输出行,这样Spark就能知道(并且您也可以).
gives exactly 10 output rows so Spark knows it (and you could too).
您还可以探索Spark SQL的 QueryExecutionListener :
You could also explore Spark SQL's QueryExecutionListener:
查询执行侦听器的接口,可用于分析执行指标.
The interface of query execution listener that can be used to analyze execution metrics.
您可以使用ExecutionListenerManager
,可以作为spark.listenerManager
来使用.
You can register a QueryExecutionListener
using ExecutionListenerManager
that's available as spark.listenerManager
.
scala> :type spark.listenerManager
org.apache.spark.sql.util.ExecutionListenerManager
scala> spark.listenerManager.
clear clone register unregister
我认为它更接近裸机",但以前从未使用过.
I think it's closer to the "bare metal", but haven't used that before.
@ D3V (在评论部分)提到使用结构化文件的QueryExecution
访问numOutputRows
SQL指标.询问.值得考虑的事情.
@D3V (in the comments section) mentioned accessing the numOutputRows
SQL metrics using QueryExecution
of a structured query. Something worth considering.
scala> :type q
org.apache.spark.sql.DataFrame
scala> :type q.queryExecution.executedPlan.metrics
Map[String,org.apache.spark.sql.execution.metric.SQLMetric]
q.queryExecution.executedPlan.metrics("numOutputRows").value
这篇关于如何获取写入的记录数(使用DataFrameWriter的保存操作)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!