如何将Spark DataFrame作为CSV存储到Azure Blob存储中 [英] How to store a spark DataFrame as CSV into Azure Blob Storage

查看:110
本文介绍了如何将Spark DataFrame作为CSV存储到Azure Blob存储中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从本地Spark群集将Spark DataFrame作为CSV存储在Azure Blob存储上

I'm trying to store a Spark DataFrame as a CSV on Azure Blob Storage from a local Spark cluster

首先,我使用Azure帐户/帐户密钥设置配置(我不确定什么是正确的配置,所以我已经设置了所有这些设置)

First, I set the config with the Azure Account/Account Key (I'm not sure what is the proper config so I've set all those)

sparkContext.getConf.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)

sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.dfs.core.windows.net", accountKey)
    sparkContext.hadoopConfiguration.set(s"fs.azure.account.key.${account}.blob.core.windows.net", accountKey)

然后我尝试使用以下内容存储CSV

Then I try to store the CSV with the following

filePath = s"wasbs://${container}@${account}.blob.core.windows.net/${prefix}/${filename}"
dataFrame.coalesce(1)
  .write.format("csv")
  .options(Map(
    "header" -> (if (hasHeader) "true" else "false"),
    "sep" -> delimiter,
    "quote" -> quote
  ))
  .save(filePath)

但是然后失败,并显示Job aborted和以下堆栈跟踪

But then this fails with Job aborted and the following stack trace

org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:196)
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:228)

但是当我查看blob容器时,我可以看到我的文件,但是无法在Spark DataFrame中读回它,但出现此错误Unable to infer schema for CSV. It must be specified manually.;并遵循堆栈跟踪

But when I look in the blob container, I can see my file however I cannot read it back in a Spark DataFrame, I get this error Unable to infer schema for CSV. It must be specified manually.; and following stack trace

org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:185)
scala.Option.getOrElse(Option.scala:121)
org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:184)
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:373)
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)

似乎该问题已经在

It seems that the problem was already reported on Databricks forum!!

在Azure Blob上存储DataFrame的正确方法是什么?

What is the proper way to store a DataFrame on Azure Blob?

推荐答案

结果证明在作业失败之前出现内部错误

It turns out way before the job fails there was an internal error

Caused by: java.lang.NoSuchMethodError: com.microsoft.azure.storage.blob.CloudBlob.startCopyFromBlob(Ljava/net/URI;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/AccessCondition;Lcom/microsoft/azure/storage/blob/BlobRequestOptions;Lcom/microsoft/azure/storage/OperationContext;)Ljava/lang/String;
    at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobWrapperImpl.startCopyFromBlob(StorageInterfaceImpl.java:399)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2449)
    at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.rename(AzureNativeFileSystemStore.java:2372)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.restoreKey(NativeAzureFileSystem.java:918)
    at org.apache.hadoop.fs.azure.NativeAzureFileSystem$NativeAzureFsOutputStream.close(NativeAzureFileSystem.java:819)
    at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
    at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
    at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
    at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
    at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
    at com.univocity.parsers.common.AbstractWriter.close(AbstractWriter.java:876)
    ... 18 more

正在发生的是在使用实际数据创建临时文件之后,它正在尝试将文件移动到用户使用CloudBlob.startCopyFromBlob指定的位置.像往常一样,microsft人通过将此方法重命名为CloudBlob.startCopy来打破了这一点.

What's happening is after creating a temp file with the actual data, it's trying to move the file to the location given by the user using CloudBlob.startCopyFromBlob. Like always, microsft people broke this by renaming this method to CloudBlob.startCopy.

我使用的是"org.apache.hadoop" % "hadoop-azure" % "3.2.1"的最新版本的"org.apache.hadoop" % "hadoop-azure" % "3.2.1",它似乎与较早的startCopyFromBlob保持一致,所以 我需要使用具有此方法的旧版azure-storage版本,可能是2.x.x.

I'm using "org.apache.hadoop" % "hadoop-azure" % "3.2.1" which is most recent for "hadoop-azure" and it seems to have stuck with the older startCopyFromBlob, so I need to use an old azure-storage version that has this method, probably 2.x.x.

请参见 https://github.com/Azure/azure-storage- java/issues/113

这篇关于如何将Spark DataFrame作为CSV存储到Azure Blob存储中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆