使用Python或Java从本地将数据上传到Azure ADLS Gen2 [英] Upload data to the Azure ADLS Gen2 from on-premise using Python or Java

查看:82
本文介绍了使用Python或Java从本地将数据上传到Azure ADLS Gen2的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Data Lake Gen2的Azure存储帐户.我想使用Python(或Java)将数据从本地上传到Lake Gen2文件系统.

I have an Azure Storage account with Data Lake Gen2. I would like to upload data from on-premise to the Lake Gen2 file systems using Python (or Java).

我找到了示例 存储"帐户中的文件共享",但我仍无法找到如何上传到Lake(而不是文件共享")的方法.我还发现了针对Gen1 Lakes的方法此处,但除已关闭的内容请求表示第二代.

I have found examples on how to interact with File Shares in the Storage account, yet I could not yet find out how to upload to the Lake (instead of the File Share). I have also found out how to do it for Gen1 Lakes here, but nothing except closed requests for Gen2.

我的问题是,到今天为止,是否可以用Python做到这一点?或者,如何使用Java将文件上传到Gen2 Lake?演示用于上传的API调用的代码片段将受到高度赞赏.

My question is whether this is even possible with Python as of today; alternatively, how can I upload files to the Gen2 Lake using Java? A code snippet demonstrating the API calls for the upload would be highly appreciated.

推荐答案

根据官方教程对Data Lake Storage进行多协议访问.

注意

只有当您注册

The features described in this article are available to accounts that have a hierarchical namespace only if you enroll in the public preview of multi-protocol access on Data Lake Storage. To review limitations, see the known issues article.

因此,将数据上传到ADLS Gen2的唯一解决方案是使用ADLS Gen2的REST API,请参考其参考书

So the only solution to upload data to ADLS Gen2 is to use the REST APIs of ADLS Gen2, please refer to its reference Azure Data Lake Store REST API.

这是我的示例代码,可以使用Python将数据上传到ADLS Gen2,并且工作正常.

Here is my sample code to upload data to ADLS Gen2 in Python, and it works fine.

import requests
import json

def auth(tenant_id, client_id, client_secret):
    print('auth')
    auth_headers = {
        "Content-Type": "application/x-www-form-urlencoded"
    }
    auth_body = {
        "client_id": client_id,
        "client_secret": client_secret,
        "scope" : "https://storage.azure.com/.default",
        "grant_type" : "client_credentials"
    }
    resp = requests.post(f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token", headers=auth_headers, data=auth_body)
    return (resp.status_code, json.loads(resp.text))

def mkfs(account_name, fs_name, access_token):
    print('mkfs')
    fs_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}?resource=filesystem", headers=fs_headers)
    return (resp.status_code, resp.text)

def mkdir(account_name, fs_name, dir_name, access_token):
    print('mkdir')
    dir_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}?resource=directory", headers=dir_headers)
    return (resp.status_code, resp.text)
    
def touch_file(account_name, fs_name, dir_name, file_name, access_token):
    print('touch_file')
    touch_file_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.put(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{dir_name}/{file_name}?resource=file", headers=touch_file_headers)
    return (resp.status_code, resp.text)

def append_file(account_name, fs_name, path, content, position, access_token):
    print('append_file')
    append_file_headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "text/plain",
        "Content-Length": f"{len(content)}"
    }
    resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=append&position={position}", headers=append_file_headers, data=content)
    return (resp.status_code, resp.text)
    
def flush_file(account_name, fs_name, path, position, access_token):
    print('flush_file')
    flush_file_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    resp = requests.patch(f"https://{account_name}.dfs.core.windows.net/{fs_name}/{path}?action=flush&position={position}", headers=flush_file_headers)
    return (resp.status_code, resp.text)

def mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token):
    print('mkfile')
    status_code, result = touch_file(account_name, fs_name, dir_name, file_name, access_token)
    if status_code == 201:
        with open(local_file_name, 'rb') as local_file:
            path = f"{dir_name}/{file_name}"
            content = local_file.read()
            position = 0
            append_file(account_name, fs_name, path, content, position, access_token)
            position = len(content)
            flush_file(account_name, fs_name, path, position, access_token)
    else:
        print(result)
        
    
if __name__ == '__main__':
    tenant_id = '<your tenant id>'
    client_id = '<your client id>'
    client_secret = '<your client secret>'
    
    account_name = '<your adls account name>'
    fs_name = '<your filesystem name>'
    dir_name = '<your directory name>'
    file_name = '<your file name>'
    local_file_name = '<your local file name>'
    
    # Acquire an Access token
    auth_status_code, auth_result = auth(tenant_id, client_id, client_secret)
    access_token = auth_status_code == 200 and auth_result['access_token'] or ''
    print(access_token)
    
    # Create a filesystem
    mkfs_status_code, mkfs_result = mkfs(account_name, fs_name, access_token)
    print(mkfs_status_code, mkfs_result)
    
    # Create a directory
    mkdir_status_code, mkdir_result = mkdir(account_name, fs_name, dir_name, access_token)
    print(mkdir_status_code, mkdir_result)
    
    # Create a file from local file
    mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token)

希望有帮助.

这篇关于使用Python或Java从本地将数据上传到Azure ADLS Gen2的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆