从spark udf记录到驱动程序 [英] log from spark udf to driver

查看:105
本文介绍了从spark udf记录到驱动程序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在spark用的数据块中有一个简单的UDF.我不能使用println或log4j之类的东西,因为它将输出到执行中,我需要在驱动程序中使用它.我有一个非常系统的日志设置

I have a simple UDF in databricks used in spark. I can't use println or log4j or something because it will get outputted to the execution, I need it in the driver. I have a very system log setup

var logMessage = ""

def log(msg: String){
  logMessage += msg + "\n"
}

def writeLog(file: String){
  println("start write")
  println(logMessage)
  println("end write")
}

def warning(msg: String){
  log("*WARNING* " + msg)
}

val CleanText = (s: int) => {
  log("I am in this UDF")
  s+2
}

sqlContext.udf.register("CleanText", CleanText)

如何使它正常运行并登录到驱动程序?

How can I get this to function properly and log to driver?

推荐答案

Apache Spark中与您尝试执行的最接近的机制是累加器.您可以在执行程序上累积日志行,并在驱动程序中访问结果:

The closest mechanism in Apache Spark to what you're trying to do is accumulators. You can accumulate the log lines on the executors and access the result in driver:

// create a collection accumulator using the spark context:
val logLines: CollectionAccumulator[String] = sc.collectionAccumulator("log")

// log function adds a line to accumulator
def log(msg: String): Unit = logLines.add(msg)

// driver-side function can print the log using accumulator's *value*
def writeLog() {
  import scala.collection.JavaConverters._
  println("start write")
  logLines.value.asScala.foreach(println)
  println("end write")
}

val CleanText = udf((s: Int) => {
  log(s"I am in this UDF, got: $s")
  s+2
})

// use UDF in some transformation:
Seq(1, 2).toDF("a").select(CleanText($"a")).show()

writeLog()    
// prints: 
// start write
// I am in this UDF, got: 1
// I am in this UDF, got: 2
// end write

BUT :实际上不建议这样做,尤其是不用于日志记录目的.如果您记录每条记录,则该累加器最终会在 OutOfMemoryError 上使您的驱动程序崩溃,或者只会使您的速度变慢.

BUT: this isn't really recommended, especially not for logging purposes. If you log on every record, this accumulator would eventually crash your driver on OutOfMemoryError or just slow you down horribly.

由于您使用的是Databricks,因此我将检查它们支持日志聚合的哪些选项,或者仅使用Spark UI来查看执行程序日志.

Since you're using Databricks, I would check what options they support for log aggregation, or simply use the Spark UI to view the executor logs.

这篇关于从spark udf记录到驱动程序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆