如何在Azure数据工厂中使用此Rest API [英] How can I consume this Rest API in Azure Data Factory

查看:61
本文介绍了如何在Azure数据工厂中使用此Rest API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个REST API,需要从Azure数据工厂调用并将数据插入到SQL表中.

I have a REST API I need to call from Azure Data Factory and insert the data into a SQL table.

从API返回的JSON格式为以下格式:

The format of the JSON returned from the API is in the following format:

{
    "serviceResponse": {
        "supportOffice": "EUKO",
        "totalPages": 5,
        "pageNo": 1,
        "recordsPerPage": 1000,
        "projects": [
            { "projectID":1 ...} , { "projectID":2 ...} ,...
        ]
    }
}

URL的格式 http://server.com/api/Projects?pageNo=1

我设法设置了一个RestService来调用API,并返回JSON和一个将接收JSON并将其传递给存储数据的存储过程的SQL Sink.

I have managed to set up a RestService to call the API and return the JSON and a SQL Sink that will take the JSON and pass it to a stored procedure that then stores the data.

但是,我正在努力解决的是如何处理分页.

However, what I am struggling with is how to handle the pagination.

我尝试过:

    RestService上的
  1. 分页选项:我认为这不会起作用,因为它只允许返回完整的下一个URL的XPATH.我看不到它将允许从totalPages和pageNo计算URL. (或者我无法正常工作)

  1. Pagination options on the RestService: I don't think this will work as it only allows for an XPATH that returns the full next URL. I can't see that it will allow the URL to be computed from the totalPages and pageNo. (or I couldn't get it to work)

我尝试在处理之前向API添加Web调用,然后计算页面数.虽然效果并不理想,但直到我达到1mb/1min的限制时为止,因为一些响应很大.这是行不通的.

I tried to add a Web call to the API before the processing to then calculate the number of pages. While not ideal it did work, until I hit the 1mb/1min limit as some responses are quite big. This is not going to work.

我尝试查看API是否可以更改,但这是不可能的.

I've tried to see if the API could change, but that is not possible.

我想知道是否有人对如何实现此功能有任何想法,或者成功使用了类似的API?

I was wondering if anyone has any ideas on how I could get this working, or has succesfully consumed a similar API?

推荐答案

以下说明将逐步创建一个类似于以下内容的管道.注意,它使用存储过程活动,Web活动和每个活动.

The following explanation will walk through creating a pipeline that looks like the following. Notice it uses Stored Procedure activities, Web Activities, and For Each activities.

首先提供Azure SQL DB,设置AAD管理员,然后按照

First provision an Azure SQL DB, setup the AAD administrator, then grant the ADF MSI permissions in the database as described here. Then create the following table and two stored procedures:

CREATE TABLE [dbo].[People](
    [id] [int] NULL,
    [email] [varchar](255) NULL,
    [first_name] [varchar](100) NULL,
    [last_name] [varchar](100) NULL,
    [avatar] [nvarchar](1000) NULL
)

GO
/*
sample call:
exec uspInsertPeople @json = '{"page":1,"per_page":3,"total":12,"total_pages":4,"data":[{"id":1,"email":"george.bluth@reqres.in","first_name":"George","last_name":"Bluth","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/calebogden/128.jpg"},{"id":2,"email":"janet.weaver@reqres.in","first_name":"Janet","last_name":"Weaver","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/josephstein/128.jpg"},{"id":3,"email":"emma.wong@reqres.in","first_name":"Emma","last_name":"Wong","avatar":"https://s3.amazonaws.com/uifaces/faces/twitter/olegpogodaev/128.jpg"}]}'
*/
create proc uspInsertPeople @json nvarchar(max)
as
begin
insert into People (id, email, first_name, last_name, avatar)
select d.*
from OPENJSON(@json)
WITH (
        [data] nvarchar(max) '$.data' as JSON
)
CROSS APPLY OPENJSON([data], '$')
    WITH (
        id int '$.id',
        email varchar(255) '$.email',
        first_name varchar(100) '$.first_name',
        last_name varchar(100) '$.last_name',
        avatar nvarchar(1000) '$.avatar'
    ) d;
end

GO

create proc uspTruncatePeople
as
truncate table People


接下来,在Azure Data Factory v2中创建一个新管道,将其重命名为ForEachPage,然后转到代码"视图并粘贴以下JSON:

Next, in Azure Data Factory v2 create a new pipeline, rename it to ForEachPage then go to the Code view and paste in the following JSON:

{
    "name": "ForEachPage",
    "properties": {
        "activities": [
            {
                "name": "GetTotalPages",
                "type": "WebActivity",
                "dependsOn": [
                    {
                        "activity": "Truncate SQL Table",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "url": {
                        "value": "https://reqres.in/api/users?page=1",
                        "type": "Expression"
                    },
                    "method": "GET"
                }
            },
            {
                "name": "ForEachPage",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "GetTotalPages",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@range(1,activity('GetTotalPages').output.total_pages)",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "GetPage",
                            "type": "WebActivity",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "url": {
                                    "value": "@concat('https://reqres.in/api/users?page=',item())",
                                    "type": "Expression"
                                },
                                "method": "GET"
                            }
                        },
                        {
                            "name": "uspInsertPeople stored procedure",
                            "type": "SqlServerStoredProcedure",
                            "dependsOn": [
                                {
                                    "activity": "GetPage",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "7.00:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "storedProcedureName": "[dbo].[uspInsertPeople]",
                                "storedProcedureParameters": {
                                    "json": {
                                        "value": {
                                            "value": "@string(activity('GetPage').output)",
                                            "type": "Expression"
                                        },
                                        "type": "String"
                                    }
                                }
                            },
                            "linkedServiceName": {
                                "referenceName": "lsAzureDB",
                                "type": "LinkedServiceReference"
                            }
                        }
                    ]
                }
            },
            {
                "name": "Truncate SQL Table",
                "type": "SqlServerStoredProcedure",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "storedProcedureName": "[dbo].[uspTruncatePeople]"
                },
                "linkedServiceName": {
                    "referenceName": "lsAzureDB",
                    "type": "LinkedServiceReference"
                }
            }
        ],
        "annotations": []
    }
}

创建与Azure SQL DB的lsAzureDB链接服务,将其设置为使用MSI进行身份验证.

Create a lsAzureDB linked service to Azure SQL DB setting it to use the MSI for authentication.

此管道调用示例分页API (目前可以,但是不是我管理的API,因此可能会在某个时候停止工作),以演示如何循环以及如何获取Web活动的结果,并通过存储过程调用和存储过程中的JSON解析将它们插入到SQL表中.该循环将以并行方式运行,但是当然您可以更改ForEachPage活动的设置以使其以串行方式运行.

This pipeline calls a sample paged API (which works at the moment but it not an API I manage so may stop working at some point) to demonstrate how to loop and how to take the results of the Web Activities and insert them to a SQL table via a stored procedure call and JSON parsing in the stored procedure. The loop will run with parallelism but certainly you could change settings on the ForEachPage activity to make it run in serial.

这篇关于如何在Azure数据工厂中使用此Rest API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆