如何将 spark DataFrame 以 CSV 格式存储到 Azure Blob 存储中 [英] How to store a spark DataFrame as CSV into Azure Blob Storage

查看:43
本文介绍了如何将 spark DataFrame 以 CSV 格式存储到 Azure Blob 存储中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将 Spark DataFrame 作为 CSV 存储在来自本地 Spark 群集的 Azure Blob 存储上

I'm trying to store a Spark DataFrame as a CSV on Azure Blob Storage from a local Spark cluster

首先,我使用 Azure 帐户/帐户密钥设置配置(我不确定什么是正确的配置,所以我已经设置了所有这些)

First, I set the config with the Azure Account/Account Key (I'm not sure what is the proper config so I've set all those)

sparkContext.getConf.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)

sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.dfs.core.windows.net", accountKey)
    sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)

然后我尝试使用以下内容存储 CSV

Then I try to store the CSV with the following

filePath = s"wasbs://${container}@${account}.blob.core.windows.net/${prefix}/${filename}"
dataFrame.coalesce(1)
  .write.format("csv")
  .options(Map(
    "header" -> (if (hasHeader) "true" else "false"),
    "sep" -> delimiter,
    "quote" -> quote
  ))
  .save(filePath)

但是这失败了 Job aborted 和以下堆栈跟踪

But then this fails with Job aborted and the following stack trace

org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)

但是当我查看 blob 容器时,我可以看到我的文件,但是我无法在 Spark DataFrame 中读回它,我收到此错误 无法推断 CSV 的架构.必须手动指定.; 和跟踪堆栈跟踪

But when I look in the blob container, I can see my file however I cannot read it back in a Spark DataFrame, I get this error Unable to infer schema for CSV. It must be specified manually.; and following stack trace

org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:184)
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

问题似乎已经在 Databricks 论坛!!

在 Azure Blob 上存储 DataFrame 的正确方法是什么?

What is the proper way to store a DataFrame on Azure Blob?

推荐答案

原来在作业失败之前出现了内部错误

It turns out way before the job fails there was an internal error

Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
    at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
    at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
    at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
    at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
    ... 18 more

发生的事情是在使用实际数据创建临时文件后,它尝试将文件移动到用户使用 CloudBlob.startCopyFromBlob 指定的位置.像往常一样,微软人通过将此方法重命名为 CloudBlob.startCopy 打破了这一点.

What's happening is after creating a temp file with the actual data, it's trying to move the file to the location given by the user using CloudBlob.startCopyFromBlob. Like always, microsft people broke this by renaming this method to CloudBlob.startCopy.

我正在使用 "org.apache.hadoop" % "hadoop-azure" % "3.2.1" 这是最新的 "hadoop-azure"并且它似乎坚持使用旧的 startCopyFromBlob,所以我需要使用具有此方法的旧 azure-storage 版本,可能是 2.x.x.

I'm using "org.apache.hadoop" % "hadoop-azure" % "3.2.1" which is most recent for "hadoop-azure" and it seems to have stuck with the older startCopyFromBlob, so I need to use an old azure-storage version that has this method, probably 2.x.x.

参见 https://github.com/Azure/azure-storage-java/issues/113

这篇关于如何将 spark DataFrame 以 CSV 格式存储到 Azure Blob 存储中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆