如何获取写入的记录数(使用DataFrameWriter的save操作)? [英] How to get the number of records written (using DataFrameWriter's save operation)?

查看:48
本文介绍了如何获取写入的记录数(使用DataFrameWriter的save操作)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用spark保存记录时,有没有办法获取写入的记录数?虽然我知道它目前不在规范中,但我希望能够执行以下操作:

val count = df.write.csv(path)

或者,能够对步骤的结果进行内联计数(最好不只使用标准累加器)将(几乎)同样有效.即:

dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()

有什么想法吗?

解决方案

我会使用

例如以下查询:

火花.读.选项(标题",真).csv("../datasets/people.csv").限制(10).写.csv(人")

正好给出 10 个输出行,所以 Spark 知道它(你也可以).

<小时>

您还可以探索 Spark SQL 的 QueryExecutionListener:

<块引用>

查询执行侦听器的接口,可用于分析执行指标.

您可以使用 ExecutionListenerManager 可用作 spark.listenerManager.

scala>:type spark.listenerManagerorg.apache.spark.sql.util.ExecutionListenerManager标度>spark.listenerManager.清除克隆注册取消注册

我认为它更接近于裸机",但之前没有使用过.

<小时>

@D3V(在评论部分)提到使用 numOutputRows SQL 指标访问结构化查询的代码>QueryExecution.值得考虑的事情.

scala>:输入qorg.apache.spark.sql.DataFrame标度>:type q.queryExecution.executedPlan.metrics地图[字符串,org.apache.spark.sql.execution.metric.SQLMetric]q.queryExecution.executedPlan.metrics("numOutputRows").value

Is there any way to get the number of records written when using spark to save records? While I know it isn't in the spec currently, I'd like to be able to do something like:

val count = df.write.csv(path)

Alternatively, being able to do an inline count (preferably without just using a standard accumulator) of the results of a step would be (almost) as effective. i.e.:

dataset.countTo(count_var).filter({function}).countTo(filtered_count_var).collect()

Any ideas?

解决方案

I'd use SparkListener that can intercept onTaskEnd or onStageCompleted events that you could use to access task metrics.

Task metrics give you the accumulators Spark uses to display metrics in SQL tab (in Details for Query).

For example, the following query:

spark.
  read.
  option("header", true).
  csv("../datasets/people.csv").
  limit(10).
  write.
  csv("people")

gives exactly 10 output rows so Spark knows it (and you could too).


You could also explore Spark SQL's QueryExecutionListener:

The interface of query execution listener that can be used to analyze execution metrics.

You can register a QueryExecutionListener using ExecutionListenerManager that's available as spark.listenerManager.

scala> :type spark.listenerManager
org.apache.spark.sql.util.ExecutionListenerManager

scala> spark.listenerManager.
clear   clone   register   unregister

I think it's closer to the "bare metal", but haven't used that before.


@D3V (in the comments section) mentioned accessing the numOutputRows SQL metrics using QueryExecution of a structured query. Something worth considering.

scala> :type q
org.apache.spark.sql.DataFrame

scala> :type q.queryExecution.executedPlan.metrics
Map[String,org.apache.spark.sql.execution.metric.SQLMetric]

q.queryExecution.executedPlan.metrics("numOutputRows").value

这篇关于如何获取写入的记录数(使用DataFrameWriter的save操作)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆