循环访问数据库中的文件失败 [英] Looping through files in databricks fails
本文介绍了循环访问数据库中的文件失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
继续Managing huge zip files in dataBricks
数据库在30个文件后挂起。怎么办?
我已经将巨大的32 GB Zip分成了100个独立的部分。我已经从文件中分离了头文件,因此可以像处理任何CSV文件一样处理它。我需要根据列过滤数据。文件位于Azure Data Lake存储Gen1中,并且必须存储在那里。
在工作约30分钟后,尝试一次读取单个文件(或所有100个文件)失败。(请参阅上面的链接问题。)我所做的:
def lookup_csv(CR_nro, hlo_lista =[], output = my_output_dir ):
base_lib = 'adl://azuredatalakestore.net/<address>'
all_files = pd.DataFrame(dbutils.fs.ls(base_lib + f'CR{CR_nro}'), columns = ['full', 'name', 'size'])
done = pd.DataFrame(dbutils.fs.ls(output), columns = ['full', 'name', 'size'])
all_files = all_files[~all_files['name'].isin(tehdyt['name'].str.replace('/', ''))]
all_files = all_files[~all_files['name'].str.contains('header')]
my_scema = spark.read.csv(base_lib + f'CR{CR_nro}/header.csv', sep=' ', header=True, maxColumns = 1000000).schema
tmp_lst = ['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT'] + [i for i in hlo_lista if i in my_scema.fieldNames()]
for my_file in all_files.iterrows():
print(my_file[1]['name'], time.ctime(time.time()))
data = spark.read.option('comment', '#').option('maxColumns', 1000000).schema(my_scema).csv(my_file[1]['full'], sep=' ').select(tmp_lst)
data.write.csv( output + my_file[1]['name'], header=True, sep=' ')
这行得通...差不多吧。它可以处理大约30个文件,然后挂起
Py4JJava错误:调用o70690.csv时出错。 原因:org.apache.park异常:作业因阶段故障而中止:阶段154.0中的任务0失败了4次,最近一次失败:阶段154.0中丢失的任务0.3(TID1435,10.11.64.46,Executor 7):com.microsoft.azure.datalake.store.ADLException:在创建文件<;my_output_dir>;CR03_pt29.vcf.gz/_started_1438828951154916601时出错 创建操作失败,HTTP401:空 2次尝试后最后一次遇到异常。[HTTP401(空),HTTP401(空)]
我尝试添加一些删除和休眠:
data.unpersist()
data = []
time.sleep(5)
还有一些尝试-异常尝试。
for j in range(1,24):
for i in range(4):
try:
lookup_csv(j, hlo_lista =FN_list, output = blake +f'<my_output>/CR{j}/' )
except Exception as e:
print(i, j, e)
time.sleep(60)
这些都不走运。一旦失败,它就会一直失败。
你知道如何处理这个问题吗?我认为与ADL-Drive的连接在一段时间后失败,但如果我将命令排队:
lookup_csv(<inputs>)
<next cell>
lookup_csv(<inputs>)
它工作、失败和运行Next cell都很好。我可以接受这一点,但令人非常恼火的是,基本循环在这种环境下无法工作。
adsl
最佳解决方案是永久挂载推荐答案存储并使用Azure应用程序。
在Azure中,请转到应用注册-使用名称注册应用,例如";Databricks_mount&qot;。在您的Delta Lake存储中为该应用程序添加IAM角色存储Blob数据参与者(&q;)。
configs = {"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": "<your-client-id>",
"fs.azure.account.oauth2.client.secret": "<your-secret>",
"fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<your-endpoint>/oauth2/token"}
dbutils.fs.mount(
source = "abfss://delta@yourdatalake.dfs.core.windows.net/",
mount_point = "/mnt/delta",
extra_configs = configs)
你可以在不挂载的情况下访问,但你仍然需要注册一个应用程序,并通过笔记本中的Spark设置应用配置才能访问ADLS。由于Azure应用程序:,它在整个会话中应该是永久的
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"),
spark.conf.set("fs.azure.account.oauth2.client.id", "<your-client-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<your-secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<your-endpoint>/oauth2/token")
这个解释是最好的https://docs.databricks.com/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sp-access.html#access-adls-gen2-directly,尽管我记得第一次我也有这个问题。在该页面上还解释了如何注册应用程序。也许这对您的公司政策没什么影响。
这篇关于循环访问数据库中的文件失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文