如何获取写入的记录数(使用DataFrameWriter的save操作)? [英] How to get the number of records written (using DataFrameWriter's save operation)?
问题描述
使用spark保存记录时,有没有办法获取写入的记录数?虽然我知道它目前不在规范中,但我希望能够执行以下操作:
val count = df.write.csv(path)
或者,能够对步骤的结果进行内联计数(最好不只使用标准累加器)将(几乎)同样有效.即:
dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()
有什么想法吗?
我会使用
例如以下查询:
火花.读.选项(标题",真).csv("../datasets/people.csv").限制(10).写.csv(人")
正好给出 10 个输出行,所以 Spark 知道它(你也可以).
<小时>您还可以探索 Spark SQL 的 QueryExecutionListener:
<块引用>查询执行侦听器的接口,可用于分析执行指标.
您可以使用 ExecutionListenerManager
可用作 spark.listenerManager
.
scala>:type spark.listenerManagerorg.apache.spark.sql.util.ExecutionListenerManager标度>spark.listenerManager.清除克隆注册取消注册
我认为它更接近于裸机",但之前没有使用过.
<小时>@D3V(在评论部分)提到使用 numOutputRows
SQL 指标访问结构化查询的代码>QueryExecution.值得考虑的事情.
scala>:输入qorg.apache.spark.sql.DataFrame标度>:type q.queryExecution.executedPlan.metrics地图[字符串,org.apache.spark.sql.execution.metric.SQLMetric]q.queryExecution.executedPlan.metrics("numOutputRows").value
Is there any way to get the number of records written when using spark to save records? While I know it isn't in the spec currently, I'd like to be able to do something like:
val count = df.write.csv(path)
Alternatively, being able to do an inline count (preferably without just using a standard accumulator) of the results of a step would be (almost) as effective. i.e.:
dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()
Any ideas?
I'd use SparkListener
that can intercept onTaskEnd
or onStageCompleted
events that you could use to access task metrics.
Task metrics give you the accumulators Spark uses to display metrics in SQL tab (in Details for Query).
For example, the following query:
spark.
read.
option("header", true).
csv("../datasets/people.csv").
limit(10).
write.
csv("people")
gives exactly 10 output rows so Spark knows it (and you could too).
You could also explore Spark SQL's QueryExecutionListener:
The interface of query execution listener that can be used to analyze execution metrics.
You can register a QueryExecutionListener
using ExecutionListenerManager
that's available as spark.listenerManager
.
scala> :type spark.listenerManager
org.apache.spark.sql.util.ExecutionListenerManager
scala> spark.listenerManager.
clear clone register unregister
I think it's closer to the "bare metal", but haven't used that before.
@D3V (in the comments section) mentioned accessing the numOutputRows
SQL metrics using QueryExecution
of a structured query. Something worth considering.
scala> :type q
org.apache.spark.sql.DataFrame
scala> :type q.queryExecution.executedPlan.metrics
Map[String,org.apache.spark.sql.execution.metric.SQLMetric]
q.queryExecution.executedPlan.metrics("numOutputRows").value
这篇关于如何获取写入的记录数(使用DataFrameWriter的save操作)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!