使用Databricks Connect时如何在Scala中正确访问dbutils [英] How to properly access dbutils in Scala when using Databricks Connect

查看:102
本文介绍了使用Databricks Connect时如何在Scala中正确访问dbutils的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Databricks Connect从IntelliJ IDEA(Scala)在本地Azure Databricks群集中运行代码.

I'm using Databricks Connect to run code in my Azure Databricks cluster locally from IntelliJ IDEA (Scala).

一切正常.我可以在IDE中本地连接,调试和检查.

Everything works fine. I can connect, debug, inspect locally in the IDE.

我创建了一个Databricks作业来运行我的自定义应用程序JAR,但由于以下异常而失败:

I created a Databricks Job to run my custom app JAR, but it fails with the following exception:

19/08/17 19:20:26 ERROR Uncaught throwable from user code: java.lang.NoClassDefFoundError: com/databricks/service/DBUtils$
at Main$.<init>(Main.scala:30)
at Main$.<clinit>(Main.scala)

我的Main.scala类的第30行是

Line 30 of my Main.scala class is

val dbutils: DBUtils.type = com.databricks.service.DBUtils

就像此文档页面

该页面显示了一种访问DBUtil的方法,该DBUtils在本地和群集中均有效.但是该示例仅显示了Python,而我正在使用Scala.

That pages shows a way to access DBUtils that works both locally and in the cluster. But the example only shows Python, and I'm using Scala.

以既可以使用databricks-connect在本地运行又可以在运行JAR的Databricks作业中运行的方式访问它的正确方法是什么?

What's the proper way to access it in a way that works both locally using databricks-connect and in a Databricks Job running a JAR?

更新

似乎有两种使用DBUtils的方法.

It seems there are two ways of using DBUtils.

1)DbUtils类在此处.引用文档,此库允许您构建和编译项目,但不能运行它.这不允许您在集群上运行本地代码.

1) The DbUtils class described here. Quoting the docs, this library allows you to build and compile the project, but not run it. This doesn't let you run your local code on the cluster.

2)Databricks Connect在此处.这使您可以在Databricks群集中运行本地Spark代码.

2) The Databricks Connect described here. This one allows you to run your local Spark code in a Databricks cluster.

问题在于这两种方法具有不同的设置和程序包名称.似乎没有一种在本地使用Databricks Connect的方法(在群集中不可用),但是随后通过sbt/maven添加了使用DbUtils类的jar应用程序,以便群集可以访问它.

The problem is that these two methods have different setups and package name. There doesn't seem to be a way to use Databricks Connect locally (which is not available in the cluster) but then have the jar application using the DbUtils class added via sbt/maven so that the cluster has access to it.

推荐答案

我不知道为什么

I don't know why the docs you mentioned don't work. Maybe you're using a different dependency?

这些文档有一个示例应用程序,您可以下载.这是一个经过最少测试的项目,因此它不会创建作业或尝试在集群上运行它们-但这只是一个开始.另外,请注意,它使用的是 dbutils-api 的较旧的 0.0.1 版本.

These docs have an example application you can download. It's a project with a very minimal test, so it doesn't create jobs or tries to run them on the cluster -- but it's a start. Also, please note that it uses the older 0.0.1 version of dbutils-api.

因此,要解决当前问题,请尝试从其他位置导入 dbutils ,而不是使用 com.databricks.service.DBUtils .

So to fix your current issue, instead of using com.databricks.service.DBUtils, try importing the dbutils from a different place:

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

或者,如果您愿意:

import com.databricks.dbutils_v1.{DBUtilsV1, DBUtilsHolder}

type DBUtils = DBUtilsV1
val dbutils: DBUtils = DBUtilsHolder.dbutils

此外,请确保您在SBT中具有以下依赖项(如果 0.0.3 不起作用,请尝试使用版本-最新的版本是 0.0.4 ):

Also, make sure that you have the following dependency in SBT (maybe try to play with versions if 0.0.3 doesn't work -- the latest one is 0.0.4):

libraryDependencies += "com.databricks" % "dbutils-api_2.11" % "0.0.3"

此问题和答案为我指明了正确的方向.答案包含指向使用 dbutils 的有效Github存储库的链接:

This question and answer pointed me in the right direction. The answer contains a link to a working Github repo which uses dbutils: waimak. I hope that this repo could aid you in further questions about Databricks config and dependencies.

祝你好运!

更新

我知道了,所以我们有两个相似但不完全相同的API,并且没有在本地版本和后端版本之间切换的好方法(尽管Databricks Connect承诺它应该可以正常工作).请让我提出解决方法.

I see, so we have two similar but not identical APIs, and no good way to switch between the local and the backend version (though Databricks Connect promises that it should work anyhow). Please let me propose a workaround.

Scala方便编写适配器是件好事.这是一个应该作为桥梁的代码片段-这里定义了 DBUtils 对象,该对象为两个版本的API提供了足够的API抽象:Databricks在 com上连接一个.databricks.service.DBUtils 和后端 com.databricks.dbutils_v1.DBUtilsHolder.dbutils API.我们可以通过加载并随后通过反射使用 com.databricks.service.DBUtils 来实现此目的-我们没有对其进行硬编码的导入.

It's good that Scala is convenient for writing adapters. Here's a code snippet which should work as a bridge -- there's the DBUtils object defined in here which provides a sufficient API abstraction for the two versions of the API: the Databricks Connect one on com.databricks.service.DBUtils, and the backend com.databricks.dbutils_v1.DBUtilsHolder.dbutils API. We're able to achieve that by both loading and subsequently using the com.databricks.service.DBUtils through reflection -- we don't have hard-coded imports of it.

package com.example.my.proxy.adapter

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.catalyst.DefinedByConstructorParams

import scala.util.Try

import scala.language.implicitConversions
import scala.language.reflectiveCalls


trait DBUtilsApi {
    type FSUtils
    type FileInfo

    type SecretUtils
    type SecretMetadata
    type SecretScope

    val fs: FSUtils
    val secrets: SecretUtils
}

trait DBUtils extends DBUtilsApi {
    trait FSUtils {
        def dbfs: org.apache.hadoop.fs.FileSystem
        def ls(dir: String): Seq[FileInfo]
        def rm(dir: String, recurse: Boolean = false): Boolean
        def mkdirs(dir: String): Boolean
        def cp(from: String, to: String, recurse: Boolean = false): Boolean
        def mv(from: String, to: String, recurse: Boolean = false): Boolean
        def head(file: String, maxBytes: Int = 65536): String
        def put(file: String, contents: String, overwrite: Boolean = false): Boolean
    }

    case class FileInfo(path: String, name: String, size: Long)

    trait SecretUtils {
        def get(scope: String, key: String): String
        def getBytes(scope: String, key: String): Array[Byte]
        def list(scope: String): Seq[SecretMetadata]
        def listScopes(): Seq[SecretScope]
    }

    case class SecretMetadata(key: String) extends DefinedByConstructorParams
    case class SecretScope(name: String) extends DefinedByConstructorParams
}

object DBUtils extends DBUtils {

    import Adapters._

    override lazy val (fs, secrets): (FSUtils, SecretUtils) = Try[(FSUtils, SecretUtils)](
        (ReflectiveDBUtils.fs, ReflectiveDBUtils.secrets)    // try to use the Databricks Connect API
    ).getOrElse(
        (BackendDBUtils.fs, BackendDBUtils.secrets)    // if it's not available, use com.databricks.dbutils_v1.DBUtilsHolder
    )

    private object Adapters {
        // The apparent code copying here is for performance -- the ones for `ReflectiveDBUtils` use reflection, while
        // the `BackendDBUtils` call the functions directly.

        implicit class FSUtilsFromBackend(underlying: BackendDBUtils.FSUtils) extends FSUtils {
            override def dbfs: FileSystem = underlying.dbfs
            override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
            override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
            override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
            override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
            override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
            override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
            override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
        }

        implicit class FSUtilsFromReflective(underlying: ReflectiveDBUtils.FSUtils) extends FSUtils {
            override def dbfs: FileSystem = underlying.dbfs
            override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
            override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
            override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
            override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
            override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
            override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
            override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
        }

        implicit class SecretUtilsFromBackend(underlying: BackendDBUtils.SecretUtils) extends SecretUtils {
            override def get(scope: String, key: String): String = underlying.get(scope, key)
            override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
            override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
            override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
        }

        implicit class SecretUtilsFromReflective(underlying: ReflectiveDBUtils.SecretUtils) extends SecretUtils {
            override def get(scope: String, key: String): String = underlying.get(scope, key)
            override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
            override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
            override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
        }
    }
}

object BackendDBUtils extends DBUtilsApi {
    import com.databricks.dbutils_v1

    private lazy val dbutils: DBUtils = dbutils_v1.DBUtilsHolder.dbutils
    override lazy val fs: FSUtils = dbutils.fs
    override lazy val secrets: SecretUtils = dbutils.secrets

    type DBUtils = dbutils_v1.DBUtilsV1
    type FSUtils = dbutils_v1.DbfsUtils
    type FileInfo = com.databricks.backend.daemon.dbutils.FileInfo

    type SecretUtils = dbutils_v1.SecretUtils
    type SecretMetadata = dbutils_v1.SecretMetadata
    type SecretScope = dbutils_v1.SecretScope
}

object ReflectiveDBUtils extends DBUtilsApi {
    // This throws a ClassNotFoundException when the Databricks Connection API isn't available -- it's much better than
    // the NoClassDefFoundError, which we would get if we had a hard-coded import of com.databricks.service.DBUtils .
    // As we're just using reflection, we're able to recover if it's not found.
    private lazy val dbutils: DBUtils =
        Class.forName("com.databricks.service.DBUtils$").getField("MODULE$").get().asInstanceOf[DBUtils]

    override lazy val fs: FSUtils = dbutils.fs
    override lazy val secrets: SecretUtils = dbutils.secrets

    type DBUtils = AnyRef {
        val fs: FSUtils
        val secrets: SecretUtils
    }

    type FSUtils = AnyRef {
        def dbfs: org.apache.hadoop.fs.FileSystem
        def ls(dir: String): Seq[FileInfo]
        def rm(dir: String, recurse: Boolean): Boolean
        def mkdirs(dir: String): Boolean
        def cp(from: String, to: String, recurse: Boolean): Boolean
        def mv(from: String, to: String, recurse: Boolean): Boolean
        def head(file: String, maxBytes: Int): String
        def put(file: String, contents: String, overwrite: Boolean): Boolean
    }

    type FileInfo = AnyRef {
        val path: String
        val name: String
        val size: Long
    }

    type SecretUtils = AnyRef {
        def get(scope: String, key: String): String
        def getBytes(scope: String, key: String): Array[Byte]
        def list(scope: String): Seq[SecretMetadata]
        def listScopes(): Seq[SecretScope]
    }

    type SecretMetadata = DefinedByConstructorParams { val key: String }

    type SecretScope = DefinedByConstructorParams { val name: String }
}

如果将您在 Main 中提到的 val dbutils:DBUtils.type = com.databricks.service.DBUtils 替换为 val dbutils:DBUtils.类型= com.example.my.proxy.adapter.DBUtils ,所有内容都可以作为本地和远程的替代产品.

If you replace the val dbutils: DBUtils.type = com.databricks.service.DBUtils which you mentioned in your Main with val dbutils: DBUtils.type = com.example.my.proxy.adapter.DBUtils, everything should work as a drop-in replacement, both locally and remotely.

如果您有一些新的 NoClassDefFoundError ,请尝试向JAR作业添加特定的依赖项,或者尝试重新排列它们,更改版本或将依赖项标记为已提供.

If you have some new NoClassDefFoundErrors, try adding specific dependencies to the JAR job, or maybe try rearranging them, changing the versions, or marking the dependencies as provided.

这个适配器不是很漂亮,它使用反射,但是我希望它可以作为一种解决方法.祝你好运:)

This adapter isn't pretty, and it uses reflection, but it should be good enough as a workaround, I hope. Good luck :)

这篇关于使用Databricks Connect时如何在Scala中正确访问dbutils的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆