使用python数据块在azure数据工厂中转换数据 [英] transform data in azure data factory using python data bricks

查看:76
本文介绍了使用python数据块在azure数据工厂中转换数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的任务是将数百万个JSON文件转换并整合为BIG CSV文件.

I have the task to transform and consolidate millions of single JSON file into BIG CSV files.

使用复制活动并映射模式,操作将非常简单,我已经测试过,问题在于大量文件具有错误的JSON格式.

The operation would be very simple using a copy activity and mapping the schemas, I have already tested, the problem is that a massive amount of files have bad JSON format.

我知道这是什么错误,而且修复也非常简单,我认为我可以使用Python Data Brick活动修复字符串,然后将输出传递给copy活动,该活动可以将记录合并为大CSV文件.

I know what is the error and the fix is very simple too, I figured that I could use a Python Data brick activity to fix the string and then pass the output to a copy activity that could consolidate the records into a big CSV file.

我有这样的想法,我不确定这是否是解决此任务的正确方法.我不知道在Data Brick活动中使用Copy Activy的输出

I have something in mind like this, I'm not sure if this is the proper way to address this task. I don't know to use the output of the Copy Activy in the Data Brick activity

推荐答案

听起来您想使用Azure Data Factory转换大量的单个JSON文件,但正如@KamilNowinski所说,它现在不支持Azure.但是,既然您正在使用Azure Databricks,那么编写一个简单的Python脚本来执行相同的操作对您来说就变得更加容易.因此,一个可行的解决方案是直接使用Azure Storage SDK和 pandas Python软件包通过Azure Databricks上的几个步骤来完成此任务.

It sounds like you want to transform a large number of single JSON file using Azure Data Factory, but it does not support on Azure now as @KamilNowinski said. However, now that you were using Azure Databricks, to write a simple Python script to do the same thing is easier for you. So a workaound solution is to directly use Azure Storage SDK and pandas Python package to do that via few steps on Azure Databricks.

  1. 也许这些JSON文件都在Azure Blob存储的容器中,因此您需要通过 read_json 函数,代码如下.

  1. Maybe these JSON files are all in a container of Azure Blob Storage, so you need to list them in container via list_blob_names and generate their urls with sas token for pandas read_json function, the code as below.

from azure.storage.blob.baseblobservice import BaseBlobService
from azure.storage.blob import ContainerPermissions
from datetime import datetime, timedelta

account_name = '<your account name>'
account_key = '<your account key>'
container_name = '<your container name>'

service = BaseBlobService(account_name=account_name, account_key=account_key)
token = service.generate_container_shared_access_signature(container_name, permission=ContainerPermissions.READ, expiry=datetime.utcnow() + timedelta(hours=1),)

blob_names = service.list_blob_names(container_name)
blob_urls_with_token = (f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{token}" for blob_name in blob_names)

#print(list(blob_urls_with_token))

  • 然后,您可以通过 read_json 函数直接从blob中读取这些JSON文件,以创建其熊猫数据框.

  • Then, you can read these JSON file directly from blobs via read_json function to create their pandas Dataframe.

    import pandas as pd
    
    for blob_url_with_token in blob_urls_with_token:
        df = pd.read_json(blob_url_with_token)
    

    即使您要将它们合并到一个大CSV文件中,也可以先通过

    Even if you want to merge them to a big CSV file, you can first merge them to a big Dataframe via pandas functions listed in Combining / joining / merging like append.

    要将数据帧写入csv文件中,我认为通过

    To write a dataframe to a csv file, I think it's very easy by to_csv function. Or you can convert a pandas dataframe to a PySpark dataframe on Azure Databricks, as the code below.

    from pyspark.sql import SQLContext
    from pyspark import SparkContext
    
    sc = SparkContext()
    sqlContest = SQLContext(sc)
    spark_df = sqlContest.createDataFrame(df)
    

  • 因此,接下来,无论您想做什么,都非常简单.并且,如果要将脚本安排为Azure Databricks中的笔记本,则可以参考正式文档 Jobs 运行Spark作业.

    So next, whatever you want to do, it's simple. And if you want to schedule the script as notebook in Azure Databricks, you can refer to the offical document Jobs to run Spark jobs.

    希望有帮助.

    这篇关于使用python数据块在azure数据工厂中转换数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆