循环访问数据库中的文件失败 [英] Looping through files in databricks fails

查看:0
本文介绍了循环访问数据库中的文件失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

继续Managing huge zip files in dataBricks

数据库在30个文件后挂起。怎么办?

我已经将巨大的32 GB Zip分成了100个独立的部分。我已经从文件中分离了头文件,因此可以像处理任何CSV文件一样处理它。我需要根据列过滤数据。文件位于Azure Data Lake存储Gen1中,并且必须存储在那里。

在工作约30分钟后,尝试一次读取单个文件(或所有100个文件)失败。(请参阅上面的链接问题。)

我所做的:

def lookup_csv(CR_nro, hlo_lista =[], output = my_output_dir ): 

  base_lib = 'adl://azuredatalakestore.net/<address>'
  all_files = pd.DataFrame(dbutils.fs.ls(base_lib + f'CR{CR_nro}'), columns = ['full', 'name', 'size'])
  done = pd.DataFrame(dbutils.fs.ls(output), columns = ['full', 'name', 'size'])
  all_files = all_files[~all_files['name'].isin(tehdyt['name'].str.replace('/', ''))]
  all_files = all_files[~all_files['name'].str.contains('header')]

  my_scema = spark.read.csv(base_lib + f'CR{CR_nro}/header.csv', sep='	', header=True, maxColumns = 1000000).schema
  tmp_lst = ['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT'] + [i for i in hlo_lista  if i in my_scema.fieldNames()]

  for my_file in all_files.iterrows(): 
    print(my_file[1]['name'], time.ctime(time.time()))
    data = spark.read.option('comment', '#').option('maxColumns', 1000000).schema(my_scema).csv(my_file[1]['full'], sep='	').select(tmp_lst)
    data.write.csv( output + my_file[1]['name'], header=True, sep='	')

这行得通...差不多吧。它可以处理大约30个文件,然后挂起

Py4JJava错误:调用o70690.csv时出错。​ 原因:org.apache.park异常:作业因阶段故障而中止:阶段154.0中的任务0失败了4次,最近一次失败:阶段154.0中丢失的任务0.3(TID1435,10.11.64.46,Executor 7):com.microsoft.azure.datalake.store.ADLException:在创建文件<;my_output_dir>;CR03_pt29.vcf.gz/_started_1438828951154916601时出错 创建操作失败,HTTP401:空 2次尝试后最后一次遇到异常。[HTTP401(空),HTTP401(空)]

我尝试添加一些删除和休眠:

   ​data.unpersist()
   ​data = []
   ​time.sleep(5)

还有一些尝试-异常尝试。

for j in range(1,24): 
    for i in range(4): 
        try: 
            lookup_csv(j, hlo_lista =FN_list, output = blake +f'<my_output>/CR{j}/' )
        except Exception as e:
            print(i, j, e)
            time.sleep(60)

这些都不走运。一旦失败,它就会一直失败。

你知道如何处理这个问题吗?我认为与ADL-Drive的连接在一段时间后失败,但如果我将命令排队:

lookup_csv(<inputs>) 
<next cell> 
lookup_csv(<inputs>) 

它工作、失败和运行Next cell都很好。我可以接受这一点,但令人非常恼火的是,基本循环在这种环境下无法工作。

adsl

最佳解决方案是永久挂载推荐答案存储并使用Azure应用程序。

在Azure中,请转到应用注册-使用名称注册应用,例如";Databricks_mount&qot;。在您的Delta Lake存储中为该应用程序添加IAM角色存储Blob数据参与者(&q;)。

configs = {"fs.azure.account.auth.type": "OAuth",
      "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", 
      "fs.azure.account.oauth2.client.id": "<your-client-id>",
      "fs.azure.account.oauth2.client.secret": "<your-secret>",
      "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<your-endpoint>/oauth2/token"}

dbutils.fs.mount(
 source = "abfss://delta@yourdatalake.dfs.core.windows.net/",
 mount_point = "/mnt/delta",
 extra_configs = configs)
你可以在不挂载的情况下访问,但你仍然需要注册一个应用程序,并通过笔记本中的Spark设置应用配置才能访问ADLS。由于Azure应用程序:

,它在整个会话中应该是永久的
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"), 
spark.conf.set("fs.azure.account.oauth2.client.id", "<your-client-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<your-secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<your-endpoint>/oauth2/token")

这个解释是最好的https://docs.databricks.com/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sp-access.html#access-adls-gen2-directly,尽管我记得第一次我也有这个问题。在该页面上还解释了如何注册应用程序。也许这对您的公司政策没什么影响。

这篇关于循环访问数据库中的文件失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆