SparkAppHandle 侦听器没有被调用 [英] SparkAppHandle Listener not getting invoked

查看:21
本文介绍了SparkAppHandle 侦听器没有被调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 play 框架在 Scala 中的 kubernetes 集群上提交 spark 2.3 作业.

I'm trying to submit a spark 2.3 job on kubernetes cluster in scala using the play framework.

我也尝试过不使用 play 框架的简单 Scala 程序.

I have also tried as a simple scala program without using play framework.

作业正在提交到 k8 集群但 stateChanged &infoChanged 没有被调用.我也希望能够获得 handle.getAppId.

The job is getting submitted to k8 cluster but stateChanged & infoChanged are not getting invoked. I also want to be able to get the handle.getAppId.

我正在使用 spark submit 提交作业,如所述这里

I'm using spark submit to submit the job, as described here

$ bin/spark-submit 
    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> 
    --deploy-mode cluster 
    --name spark-pi 
    --class org.apache.spark.examples.SparkPi 
    --conf spark.executor.instances=5 
    --conf spark.kubernetes.container.image=<spark-image> 
    local:///path/to/examples.jar

这是作业的代码:

def index = Action {
    try {
       val spark = new SparkLauncher()
        .setMaster("my k8 apiserver host")
        .setVerbose(true)
        .addSparkArg("--verbose")
        .setMainClass("myClass")
        .setAppResource("hdfs://server/inputs/my.jar")
        .setConf("spark.app.name","myapp")
        .setConf("spark.executor.instances","5")
        .setConf("spark.kubernetes.container.image","mydockerimage")
        .setDeployMode("cluster")
        .startApplication(new SparkAppHandle.Listener(){

          def infoChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] Info Changed.  State [" 
              + handle.getState + "]")
          }

          def stateChanged(handle: SparkAppHandle): Unit = {
            System.out.println("Spark App Id [" 
              + handle.getAppId 
              + "] State Changed. State [" 
              + handle.getState + "]")
            if (handle.getState.toString == "FINISHED") System.exit(0)
          }    
      } )

    Ok(spark.getState().toString())

    } catch {
      case NonFatal(e)=>{
        println("failed with exception: " + e)
      }
    }    
  Ok
}

推荐答案

Spark Launcher 架构概述

SparkLauncher 允许以编程方式运行 spark-submit 命令.它在 JVM 中作为单独的子线程运行.您需要在客户端主函数中等待,直到驱动程序在 K8s 中启动并获得侦听器回调.否则,JVM 主线程存在杀死客户端并且不报告任何内容.

Spark Launcher Architecture Overview

SparkLauncher allows to programmatically run spark-submit command. It runs as a separate child thread in the JVM. You need to wait in your client main function until driver get launched in K8s and you get listener callbacks. Otherwise, JVM main threads exist killing the client and not reporting anything.

-----------------------                       -----------------------
|      User App       |     spark-submit      |      Spark App      |
|                     |  -------------------> |                     |
|         ------------|                       |-------------        |
|         |           |        hello          |            |        |
|         | L. Server |<----------------------| L. Backend |        |
|         |           |                       |            |        |
|         -------------                       -----------------------
|               |     |                              ^
|               v     |                              |
|        -------------|                              |
|        |            |      <per-app channel>       |
|        | App Handle |<------------------------------
|        |            |
-----------------------

解决方案

我添加了一个 j.u.c.CountDownLatch 实现,以防止主线程退出,直到达到 appState.isFinal.

Solution

I have added a j.u.c.CountDownLatch implementation that prevents main thread exiting until appState.isFinal is reached.

object SparkLauncher {
  def main(args: Array[String]) {

    import java.util.concurrent.CountDownLatch
    val countDownLatch = new CountDownLatch(1)

    val launcher = new SparkLauncher()
      .setMaster("k8s://http://127.0.0.1:8001")
      .setAppResource("local:/{PATH}/spark-examples_2.11-2.3.0.jar")
      .setConf("spark.app.name","spark-pi")
      .setMainClass("org.apache.spark.examples.SparkPi")
      .setConf("spark.executor.instances","5")
      .setConf("spark.kubernetes.container.image","spark:spark-docker")
      .setConf("spark.kubernetes.driver.pod.name","spark-pi-driver")
      .setDeployMode("cluster")
      .startApplication(new SparkAppHandle.Listener() {
        def infoChanged(handle: SparkAppHandle): Unit = {
        }

        def stateChanged(handle: SparkAppHandle): Unit = {
          val appState = handle.getState()
          println(s"Spark App Id [${handle.getAppId}] State Changed. State [${handle.getState}]")

          if (appState != null && appState.isFinal) {
            countDownLatch.countDown //waiting until spark driver exits
          }
        }
      })

    countDownLatch.await()
  }
}

这篇关于SparkAppHandle 侦听器没有被调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆