如何优雅地停止笔记本的流媒体作业? [英] How to stop a notebook streaming job gracefully?

查看:102
本文介绍了如何优雅地停止笔记本的流媒体作业?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个流应用程序正在运行到Databricks笔记本作业中( https://docs.databricks .com/jobs.html ).我希望能够使用stream.start()方法返回的StreamingQuery类的stop()方法来优雅地停止流作业.当然,这需要访问所提到的流实例或访问正在运行的作业本身的上下文.在第二种情况下,代码可能如下所示:

I have a streaming application which is running into a Databricks notebook job (https://docs.databricks.com/jobs.html). I would like to be able to stop the streaming job gracefully using the stop() method of the StreamingQuery class which is returned by the stream.start() method. That of course requires to either have access to the mentioned streaming instance or to access the context of the running job itself. In this second case the code could look as next:

spark.sqlContext.streams.get("some_streaming_uuid").stop()

上面的代码应该从另一个笔记本作业中执行,我们称它为stop_streaming_job,尽管我无法找到一种访问作业上下文并执行上述scala代码的方法.使用databricks笔记本有什么方法可以实现?

The above code should be executed from a different notebook job lets call it stop_streaming_job although I wasn't able to find a way to access the job context and to execute the above scala code. Is there any way to achieve that with the databricks notebooks?

推荐答案

一种解决方法是使用databricks文件系统(dbfs)或本地文件系统.这个想法是通过实现称为awaitExternalTermination的新功能来扩展Spark StreamingQuery类的功能.该解决方案在给定的DBFS目录中创建一个新文件,该文件充当负责流作业的生命周期的标志.只要文件存在于给定目录中,该作业就会继续运行.接下来是文件监视程序的实现,它是StreamingQuery类的扩展方法,并使用Scala期货:

One way to solve this is using the databricks file system (dbfs) or your local file system. The idea is to extend the functionality of the Spark StreamingQuery class by implementing a new function called awaitExternalTermination. The solution creates a new file in the given DBFS directory which acts as a flag responsible for the lifetime of the streaming job. The job will continue running as long as the file exists in the given directory. Next follows the implementation of the file watcher which is an extension method of the StreamingQuery class and uses Scala futures:

object extensions {
  import fs._
  object FileSystemType extends Enumeration {
    val DBFS, LocalFileSystem = Value
  }

  implicit class FileSystemStopStreamingQuery(val self :StreamingQuery) extends AnyVal {
    /**
     * Extension method for StreamingQuery, it waits for an external call to delete the streaming file. When that happens it will call the stop method
     * of the current StreamingQuery instance.
     *
     * @param streamStopDir dir to be watched
     * @param jobName the job unique identifier/the file name
     * @param fsType DFFS or LocalFileSystem
     */
    def awaitExternalTermination(streamStopDir :String, jobName :String, fsType : FileSystemType.Value): Unit ={

      if(streamStopDir == null || streamStopDir.isEmpty)
        throw new IllegalArgumentException("streamStopDir can't be null or empty.")

      if(jobName == null || jobName.isEmpty)
        throw new IllegalArgumentException("jobName can't be null or empty.")

      val fsWrapper :FileSystemWrapper = fsType match {
        case FileSystemType.DBFS => new DbfsWrapper(streamStopDir, jobName)
        case FileSystemType.LocalFileSystem => new LocalFileSystemWrapper(streamStopDir, jobName)
        case _ => throw new IllegalArgumentException("Invalid file system provided.")
      }

      val stopWatchFuture: Future[Boolean] = Future {

        if(!fsWrapper.targetFileExists)
            fsWrapper.createTargetFile(self.id.toString)

        while (self.isActive && fsWrapper.targetFileExists){
          val random: ThreadLocalRandom = ThreadLocalRandom.current()
          val r = random.nextLong(10, 100 + 1) // returns value between 10 and 100
          Thread.sleep(r)
        }

        if(!fsWrapper.targetFileExists){
          self.stop()
          true
        }
        else
          false
      }

      var output = "success"
      stopWatchFuture onComplete {
        case Success(result : Boolean) => if (!result) {
          output = s"failure: file not found."
        }
        case Failure(t) => output = s"failure: ${t.getMessage}."
      }

      self.awaitTermination()
    }
  }
}

以及DBFS包装器类的实现:

And the implementation of the DBFS wrapper class:

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

class DbfsWrapper(val stopDir: String, val targetFile: String) extends FileSystemWrapper {
  override def targetFileExists(): Boolean = {
    try {
      dbutils.fs.ls(targetPath).size > 0
    }
    catch {
      case _: java.io.FileNotFoundException => false
    }
  }

  override def createTargetFile(content: String): Unit = {
    dbutils.fs.put(targetPath, content)
  }
}

要停止流式传输作业,只需在使用DBFS时使用%fs rm -r your_path删除提到的文件,或者对于本地FS仅使用rm -r your_path即可.

To stop the streaming job just remove the mentioned file with %fs rm -r your_path when using DBFS or just rm -r your_path for the local FS.

完整的代码可以在此处找到.

The complete code can be found here.

这篇关于如何优雅地停止笔记本的流媒体作业?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆