将Spark Dataframe从Azure Databricks的笔记本作业保存到Azure Blob存储会导致java.lang.NoSuchMethodError [英] Saving spark dataframe from azure databricks' notebook job to azure blob storage causes java.lang.NoSuchMethodError

查看:104
本文介绍了将Spark Dataframe从Azure Databricks的笔记本作业保存到Azure Blob存储会导致java.lang.NoSuchMethodError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在天蓝色的数据块中使用笔记本创建了一个简单的作业. 我正在尝试将笔电数据帧从笔记本保存到天蓝色的Blob存储中. 附加示例代码

I have created a simple job using notebook in azure databricks. I am trying to save a spark dataframe from notebook to azure blob storage. Attaching the sample code

import traceback

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Attached the spark submit command used
# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,
# com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

# Tried with com.microsoft.azure:azure-storage:2.2.0

SECRET_ACCESS_KEY = "xxxxx"
STORAGE_NAME = "my_storage"
CONTAINER = "my_container"
SUB_PATH = "/azure_dbs_check/"
FILE_NAME = "result"

spark = SparkSession \
    .builder \
    .appName("azure_dbs_to_azure_blob") \
    .getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()

try:
    spark_context = spark.sparkContext
    fs_acc_key = "fs.azure.account.key." + STORAGE_NAME + ".blob.core.windows.net"

    spark.conf.set("fs.wasbs.impl",
                   "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    spark.conf.set(fs_acc_key, SECRET_ACCESS_KEY)

    file_path = 'wasbs://' + CONTAINER + '@' + STORAGE_NAME + '.blob.core.windows.net' + SUB_PATH + FILE_NAME

    df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
    print("Written successful")
except Exception as exp:
    print("Exception occurred")
    print(traceback.format_exc())

当我在本地计算机上运行spark-submit时,以上代码有效. 使用的spark提交命令是

The above code works when i run the spark-submit in local machine. The spark submit command used is

spark-submit --master local [1] --packages org.apache.hadoop:hadoop-azure:2.7.2,com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

可能的根本原因可能是

原因:java.lang.NoSuchMethodError:
com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob

Caused by: java.lang.NoSuchMethodError:
com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob

因此,我将该程序包降级为 com.microsoft.azure:azure-storage:2.2.0 ,其中包含 startCopyFromBlob 方法.
(在com.microsoft.azure:azure-storage:3.x.x版本中,CloudBlob上此 已弃用的startCopyFromBlob() 已删除)

So i downgraded the package to com.microsoft.azure:azure-storage:2.2.0 which contains the startCopyFromBlob method.
(In com.microsoft.azure:azure-storage:3.x.x versions,this deprecated startCopyFromBlob() on CloudBlob is removed)

即使降级后,错误仍然保持不变.

Error remains the same even after downgrading process.

附加错误堆栈跟踪,

    Traceback (most recent call last):
      File "<command-4281470986294005>", line 28, in <module>
        df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")
      File "/databricks/spark/python/pyspark/sql/readwriter.py", line 738, in save
        self._jwrite.save(path)
      File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
        answer, self.gateway_client, self.target_id, self.name)
      File "/databricks/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling o255.save.
    : org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:192)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:110)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:108)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:128)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:116)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
        at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
        at py4j.Gateway.invoke(Gateway.java:295)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:251)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 52, 10.2.3.12, executor 0): org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
        at org.apache.spark.scheduler.Task.run(Task.scala:112)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.IllegalStateException: Error closing the output.
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:880)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.close(UnivocityGenerator.scala:85)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVFileFormat.scala:193)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 11 more
    Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
        at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
        at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
        ... 19 more

    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2355)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2343)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2342)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2342)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1096)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1096)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2574)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2510)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:893)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2243)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 33 more
    Caused by: org.apache.spark.SparkException: Task failed while writing rows.
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:257)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
        at org.apache.spark.scheduler.Task.run(Task.scala:112)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
    Caused by: java.lang.IllegalStateException: Error closing the output.
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:880)
        at org.apache.spark.sql.execution.datasources.csv.UnivocityGenerator.close(UnivocityGenerator.scala:85)
        at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.close(CSVFileFormat.scala:193)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)
        at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:74)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1560)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
        ... 11 more
    Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
        at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
        at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
        at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
        at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
        at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
        at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
        at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
        at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
        at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
        ... 19 more

包含的Spark提交软件包:

Included Spark submit packages:

  • org.apache.hadoop:hadoop-azure:2.7.2,
  • com.microsoft.azure:azure-storage:3.1.0(后来尝试使用 com.microsoft.azure:azure-storage:2.2.0 )

本地计算机:
Python 3.6
Spark版本2.4.4使用Scala版本2.11.12

Local machine:
Python 3.6
Spark version 2.4.4 Using Scala version 2.11.12

数据块详细信息:
群集信息:
5.5 LTS(包括Apache Spark 2.4.3,Scala 2.11)
Python 3(3.5)

Databricks details:
Cluster information:
5.5 LTS (includes Apache Spark 2.4.3, Scala 2.11)
Python 3 (3.5)

运行时5.5发行说明表示软件包 com.microsoft.azure azure-storage 5.2.0 已安装在环境中.

The Runtime 5.5 release notes says that the package com.microsoft.azure azure-storage 5.2.0 is already installed in the environment.

即使作业中指定了另一个版本( 2.2.0 ),也是由于火花从环境( 5.2.0版本)中带走库而导致的问题吗? 在5.2.0等版本中,方法 startCopyFromBlob()

Is the problem due to spark taking the library from environment(5.2.0 version) even though another version(2.2.0) is specified in job? In versions like 5.2.0,the method startCopyFromBlob() is removed.

我已经在 观察:

  1. Databricks Job使用预安装的库azure-storage:5.2.0.此程序包没有 com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob()方法. (在4.x.x版本中由startCopy()替换).天青存储固定为5.2.0

  1. Databricks Job uses pre-installed library azure-storage:5.2.0. This package does not have com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob() method. (replaced by startCopy() in 4.x.x versions). azure-storage is fixed as 5.2.0

因此,我尝试使用最新的hadoop-azure:3.2.1来尝试获取不调用不赞成使用的方法的jar.但这导致了新的错误
java.lang.NoClassDefFoundError:org/apache/hadoop/fs/StreamCapabilities .

So i tried to use latest hadoop-azure:3.2.1 in an attempt to get jar which does not call the deprecated method. But this caused a new error
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/StreamCapabilities .

StreamCapabilities类存在于hadoop-common软件包中.所以我加入了最新的hadoop-common(3.2.1).
这引起了 java.lang.NoSuchMethodError:org.apache.hadoop.security.ProviderUtils.excludeIncompatibleCredentialProviders().
原因:
org.apache.hadoop:hadoop-common:2.7.3 已在天蓝色的运行时预先安装.这个hadoop-common:2.7.3没有 ProviderUtils.excludeIncompatibleCredentialProviders()方法.

StreamCapabilities class is present in hadoop-common packages. So i included latest hadoop-common (3.2.1).
This caused java.lang.NoSuchMethodError: org.apache.hadoop.security.ProviderUtils.excludeIncompatibleCredentialProviders().
Reason:
org.apache.hadoop:hadoop-common:2.7.3 is pre-installed on azure run time. This hadoop-common:2.7.3 does not have ProviderUtils.excludeIncompatibleCredentialProviders() method.

由于两个软件包(hadoop-common:2.7.3和azure-storage:5.2.0)都是固定的(预安装),因此我尝试使用较低的hadoop-azure软件包来尝试查找不调用 excludeIncompatibleCredentialProviders()方法的版本.

Since both packages(hadoop-common:2.7.3 & azure-storage:5.2.0) are fixed(pre-installed), i tried to use the lower hadoop-azure packages in an attempt to find version which does not call excludeIncompatibleCredentialProviders() method.

hadoop-azure:3.2.1(截至2019年11月) hadoop-azure:2.8.0 excludeIncompatibleCredentialProviders() 被称为内部.
低于2.8.0 ,开始出现旧错误
NoSuchMethodError:com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob

From hadoop-azure:3.2.1(latest as of Nov 2019) to hadoop-azure:2.8.0, excludeIncompatibleCredentialProviders() is called inside.
Below 2.8.0, am started to get the old error
NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob

推荐答案

一种替代方法是创建一个安装架:

One alternative is to create a mount:

https://docs.databricks.com/data/data-sources/azure/azure-storage.html

然后根据需要调整保存路径.

And then adjust the save path as necessary.

我也建议使用此

spark.conf.set(
  "fs.azure.account.key.<storage-account-name>.blob.core.windows.net",
  "<storage-account-access-key>")

代替

spark_context._jsc.hadoopConfiguration().set(fs_acc_key, SECRET_ACCESS_KEY)

由于您使用的是DataFrame API,而不是RDD API.

Since you are using the DataFrame api instead of the RDD api.

编辑

在Databricks社区集群中运行以下代码,并修改spark.conf.set语句.

Ran the following code in a Databricks Community cluster and modified the spark.conf.set statements.

import traceback

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# Attached the spark submit command used
# spark-submit --master local[1] --packages org.apache.hadoop:hadoop-azure:2.7.2,
# com.microsoft.azure:azure-storage:3.1.0 ./write_to_blob_from_spark.py

# Tried with com.microsoft.azure:azure-storage:2.2.0

SECRET_ACCESS_KEY = "ACCESSKEY"
STORAGE_NAME = "ACCOUNTNAME"
CONTAINER = "CONTAINER"
SUB_PATH = "/azure_dbs_check/"
FILE_NAME = "result"

spark = SparkSession \
    .builder \
    .appName("azure_dbs_to_azure_blob") \
    .getOrCreate()

df = spark.createDataFrame(["10", "11", "13"], StringType()).toDF("age")
df.show()

try:
    fs_acc_key = "fs.azure.account.key." + STORAGE_NAME + ".blob.core.windows.net"

    spark.conf.set("spark.hadoop.fs.wasb.impl", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    spark.conf.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    spark.conf.set(fs_acc_key, SECRET_ACCESS_KEY)

    file_path = 'wasbs://' + CONTAINER + '@' + STORAGE_NAME + '.blob.core.windows.net' + SUB_PATH + FILE_NAME

    print(file_path)

    df.write.save(file_path + '_csv', format='csv', header=True, mode="overwrite")

    print("Written successful")
except Exception as exp:
    print("Exception occurred")
    print(traceback.format_exc())

这篇关于将Spark Dataframe从Azure Databricks的笔记本作业保存到Azure Blob存储会导致java.lang.NoSuchMethodError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆