如何在 Azure 数据工厂中使用这个 Rest API [英] How can I consume this Rest API in Azure Data Factory

查看:27
本文介绍了如何在 Azure 数据工厂中使用这个 Rest API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 REST API,我需要从 Azure 数据工厂调用并将数据插入到 SQL 表中.

API返回的JSON格式如下:

<代码>{服务响应":{"supportOffice": "EUKO",总页数":5,页面编号":1,每页记录":1000,项目":[{ "projectID":1 ...} , { "projectID":2 ...} ,...]}}

网址格式为

首先提供一个 Azure SQL DB,设置 AAD 管理员,然后如所述在数据库中授予 ADF MSI 权限此处.然后创建下表和两个存储过程:

创建表 [dbo].[人]([id] [int] NULL,[email] [varchar](255) NULL,[first_name] [varchar](100) NULL,[last_name] [varchar](100) NULL,[头像] [nvarchar](1000) NULL)走/*示例调用:exec uspInsertPeople @json = '{"page":1,"per_page":3,"total":12,"total_pages":4,"data":[{"id":1,"email":"george.bluth@reqres.in","first_name":"George","last_name":"Bluth","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/calebogden/128.jpg"},{"id":2,"email":"janet.weaver@reqres.in","first_name":"Janet","last_name":"Weaver","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/josephstein/128.jpg"},{"id":3,"email":"emma.wong@reqres.in","first_name":"Emma","last_name":"Wong","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/olegpogodaev/128.jpg"}]}'*/创建过程 uspInsertPeople @json nvarchar(max)作为开始插入人物(id、email、first_name、last_name、avatar)选择 d.*来自 OPENJSON(@json)和 ([数据] nvarchar(max) '$.data' 作为 JSON)交叉应用 OPENJSON([数据], '$')和 (id int '$.id',电子邮件 varchar(255) '$.email',first_name varchar(100) '$.first_name',last_name varchar(100) '$.last_name',头像 nvarchar(1000) '$.avatar') d;结尾走创建过程 uspTruncatePeople作为截断表人

接下来,在 Azure 数据工厂 v2 中创建一个新管道,将其重命名为 ForEachPage,然后转到代码视图并粘贴以下 JSON:

<代码>{"name": "ForEachPage",特性": {活动": [{"name": "GetTotalPages","type": "WebActivity",取决于": [{"activity": "截断 SQL 表",依赖条件":[成功"]}],政策": {"超时": "7.00:00:00",重试":0,"retryIntervalInSeconds": 30,安全输出":假,安全输入":假},用户属性":[],类型属性":{网址":{"value": "https://reqres.in/api/users?page=1",类型":表达式"},方法":获取"}},{"name": "ForEachPage","type": "ForEach",取决于": [{"activity": "GetTotalPages",依赖条件":[成功"]}],用户属性":[],类型属性":{项目": {"value": "@range(1,activity('GetTotalPages').output.total_pages)",类型":表达式"},活动": [{"name": "GetPage","type": "WebActivity",取决于": [],政策": {"超时": "7.00:00:00",重试":0,"retryIntervalInSeconds": 30,安全输出":假,安全输入":假},用户属性":[],类型属性":{网址":{"value": "@concat('https://reqres.in/api/users?page=',item())",类型":表达式"},方法":获取"}},{"name": "uspInsertPeople 存储过程","type": "SqlServerStoredProcedure",取决于": [{"活动": "获取页面",依赖条件":[成功"]}],政策": {"超时": "7.00:00:00",重试":0,"retryIntervalInSeconds": 30,安全输出":假,安全输入":假},用户属性":[],类型属性":{"storedProcedureName": "[dbo].[uspInsertPeople]",存储过程参数":{json":{价值": {"value": "@string(activity('GetPage').output)",类型":表达式"},类型":字符串"}}},链接服务名称":{"referenceName": "lsAzureDB","type": "LinkedServiceReference"}}]}},{"name": "截断 SQL 表","type": "SqlServerStoredProcedure",取决于": [],政策": {"超时": "7.00:00:00",重试":0,"retryIntervalInSeconds": 30,安全输出":假,安全输入":假},用户属性":[],类型属性":{"storedProcedureName": "[dbo].[uspTruncatePeople]"},链接服务名称":{"referenceName": "lsAzureDB","type": "LinkedServiceReference"}}],注释":[]}}

创建一个到 Azure SQL DB 的 lsAzureDB 链接服务,将其设置为使用 MSI 进行身份验证.

此管道调用 示例分页 API(目前可以使用,但它不是我管理的 API,因此可能会在某些时候停止工作)来演示如何循环以及如何获取 Web 活动的结果并通过存储过程调用和存储过程中的 JSON 解析将它们插入到 SQL 表中.该循环将以并行方式运行,但您当然可以更改 ForEachPage 活动的设置以使其串行运行.

I have a REST API I need to call from Azure Data Factory and insert the data into a SQL table.

The format of the JSON returned from the API is in the following format:

{
    "serviceResponse": {
        "supportOffice": "EUKO",
        "totalPages": 5,
        "pageNo": 1,
        "recordsPerPage": 1000,
        "projects": [
            { "projectID":1 ...} , { "projectID":2 ...} ,...
        ]
    }
}

the URL is in the format http://server.com/api/Projects?pageNo=1

I have managed to set up a RestService to call the API and return the JSON and a SQL Sink that will take the JSON and pass it to a stored procedure that then stores the data.

However, what I am struggling with is how to handle the pagination.

I have tried:

  1. Pagination options on the RestService: I don't think this will work as it only allows for an XPATH that returns the full next URL. I can't see that it will allow the URL to be computed from the totalPages and pageNo. (or I couldn't get it to work)

  2. I tried to add a Web call to the API before the processing to then calculate the number of pages. While not ideal it did work, until I hit the 1mb/1min limit as some responses are quite big. This is not going to work.

  3. I've tried to see if the API could change, but that is not possible.

I was wondering if anyone has any ideas on how I could get this working, or has succesfully consumed a similar API?

解决方案

The following explanation will walk through creating a pipeline that looks like the following. Notice it uses Stored Procedure activities, Web Activities, and For Each activities.

First provision an Azure SQL DB, setup the AAD administrator, then grant the ADF MSI permissions in the database as described here. Then create the following table and two stored procedures:

CREATE TABLE [dbo].[People](
    [id] [int] NULL,
    [email] [varchar](255) NULL,
    [first_name] [varchar](100) NULL,
    [last_name] [varchar](100) NULL,
    [avatar] [nvarchar](1000) NULL
)

GO
/*
sample call:
exec uspInsertPeople @json = '{"page":1,"per_page":3,"total":12,"total_pages":4,"data":[{"id":1,"email":"george.bluth@reqres.in","first_name":"George","last_name":"Bluth","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/calebogden/128.jpg"},{"id":2,"email":"janet.weaver@reqres.in","first_name":"Janet","last_name":"Weaver","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/josephstein/128.jpg"},{"id":3,"email":"emma.wong@reqres.in","first_name":"Emma","last_name":"Wong","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/olegpogodaev/128.jpg"}]}'
*/
create proc uspInsertPeople @json nvarchar(max)
as
begin
insert into People (id, email, first_name, last_name, avatar)
select d.*
from OPENJSON(@json)
WITH (
        [data] nvarchar(max) '$.data' as JSON
)
CROSS APPLY OPENJSON([data], '$')
    WITH (
        id int '$.id',
        email varchar(255) '$.email',
        first_name varchar(100) '$.first_name',
        last_name varchar(100) '$.last_name',
        avatar nvarchar(1000) '$.avatar'
    ) d;
end

GO

create proc uspTruncatePeople
as
truncate table People


Next, in Azure Data Factory v2 create a new pipeline, rename it to ForEachPage then go to the Code view and paste in the following JSON:

{
    "name": "ForEachPage",
    "properties": {
        "activities": [
            {
                "name": "GetTotalPages",
                "type": "WebActivity",
                "dependsOn": [
                    {
                        "activity": "Truncate SQL Table",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "url": {
                        "value": "https://reqres.in/api/users?page=1",
                        "type": "Expression"
                    },
                    "method": "GET"
                }
            },
            {
                "name": "ForEachPage",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "GetTotalPages",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@range(1,activity('GetTotalPages').output.total_pages)",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "GetPage",
                            "type": "WebActivity",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "url": {
                                    "value": "@concat('https://reqres.in/api/users?page=',item())",
                                    "type": "Expression"
                                },
                                "method": "GET"
                            }
                        },
                        {
                            "name": "uspInsertPeople stored procedure",
                            "type": "SqlServerStoredProcedure",
                            "dependsOn": [
                                {
                                    "activity": "GetPage",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "storedProcedureName": "[dbo].[uspInsertPeople]",
                                "storedProcedureParameters": {
                                    "json": {
                                        "value": {
                                            "value": "@string(activity('GetPage').output)",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    }
                                }
                            },
                            "linkedServiceName": {
                                "referenceName": "lsAzureDB",
                                "type": "LinkedServiceReference"
                            }
                        }
                    ]
                }
            },
            {
                "name": "Truncate SQL Table",
                "type": "SqlServerStoredProcedure",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "storedProcedureName": "[dbo].[uspTruncatePeople]"
                },
                "linkedServiceName": {
                    "referenceName": "lsAzureDB",
                    "type": "LinkedServiceReference"
                }
            }
        ],
        "annotations": []
    }
}

Create a lsAzureDB linked service to Azure SQL DB setting it to use the MSI for authentication.

This pipeline calls a sample paged API (which works at the moment but it not an API I manage so may stop working at some point) to demonstrate how to loop and how to take the results of the Web Activities and insert them to a SQL table via a stored procedure call and JSON parsing in the stored procedure. The loop will run with parallelism but certainly you could change settings on the ForEachPage activity to make it run in serial.

这篇关于如何在 Azure 数据工厂中使用这个 Rest API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆