使用逻辑应用程序执行SQL Stored proc并循环遍历每条记录以发送到服务总线 [英] Executing SQL Stored proc with logic app and looping through each record to send to service bus

查看:74
本文介绍了使用逻辑应用程序执行SQL Stored proc并循环遍历每条记录以发送到服务总线的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我正在开发一个执行存储过程(输出为XML)的逻辑应用程序。从那里我试图采取该数据集并将其发送到服务总线。问题是数据集有时可能大于服务总线的最大大小。所以我想要
尝试做的是从SQL获取数据集并循环遍历记录并将它们分批发送或单独发送。 我不知道该怎么做。我已经尝试过使用数据集的每一个,但它发送的数据仍然很大。 
任何输入或帮助都将不胜感激。 谢谢!



" definition":{

        " $ schema":" https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#" ;,

  ;       "行动":{

            "撰写":{

                "输入":" @body('Execute_stored_procedure')?['resultsets']?['Table1']",'
                "runAfter":{

                    "Execute_stored_procedure":[

                        "成功"

                    ]¥b $ b                },b $ b                "type":"Compose"&b
            },b $ b            "条件":{

                "行动":{},

                "else":{

                    "行动":{

                        "Condition_2":{

                            "行动":{},

                            "else":{

                                "行动":{

                                    "For_each":{

                                        "行动":{

                                            "FnCombineMessages_2":{

                                                "输入":{

                                                    "body":" @body('Execute_stored_procedure')?['resultsets']?['Table1']",'
                                                    "功能":{

                                                        " id":" /subscriptions/2cd57834-0c84-4e37-9934-6655e90954e6/resourceGroups/dev-integration-rg/providers/Microsoft.Web/sites/PCNAFunctions-dev/functions/FnCombineMessages"

                                                    }¥b $ b                                                },b $ b                                                "runAfter":{},

                                                "type":""""

                                            },b $ b                                            "Send_message":{

                                                "输入":{

                                                    "body":{

                                                        "ContentData":" @ {base64(body('FnCombineMessages_2'))}"

                                                    },b $ b                                                    "host":{

                                                        "连接":{

                                                            " name":" @parameters('$ connections')['servicebus'] ['connectionId']"

                                                        }¥b $ b                                                    },b $ b                                                    "方法":" post",
                                                    " path":"/ {{{{{{{{{{{                                                   "查询":{

                                                        "systemProperties":"无"和"b $ b                                                    }¥b $ b                                                },b $ b                                                "runAfter":{

                                                    "FnCombineMessages_2":[

                                                        "成功"

                                                    ]¥b $ b                                                },b $ b                                                "type":"ApiConnection"

                                            }¥b $ b                                        },b $ b                                        "foreach":" @outputs('Compose')",$
                                        " operationOptions":" Sequential",
                                        "runAfter":{},

                                        "type":" Foreach"

                                    }¥b $ b                                }¥b $ b                            },b $ b                            "表达":{

                                "和":[

                                    {

                                        "等于":[

                                            "@body('Execute_stored_procedure')?['resultsets']?['Table1']",
                                            "@json('[]')"

                                        ]¥b $ b                                    }¥b $ b                                ]¥b $ b                            },b $ b                            "runAfter":{},

                            "type":"if""

                        }¥b $ b                    }¥b $ b                },b $ b                "表达":{

                    "和":[

                        {

                            "等于":[

                                "@body('Execute_stored_procedure')?['resultsets']?['Table1']",
                                0

                            ]¥b $ b                        }¥b $ b                    ]¥b $ b                },b $ b                "runAfter":{

                    "Execute_a_SQL_query":[

                        "成功"

                    ]¥b $ b                },b $ b                "type":"if""

            },b $ b            "Execute_a_SQL_query":{

                "输入":{

                    "body":{

                        " query":" select count(*)from dbo.Staging_PIMSupplementalItem"

                    },b $ b                    "host":{

                        "连接":{

                            " name":" @parameters('$ connections')['sql'] ['connectionId']"

                        }¥b $ b                    },b $ b                    "方法":" post",
                    " path" ;:\"/ datasets / default / query / sql"

                },b $ b                "runAfter":{

                    "撰写":[

                        "成功"

                    ]¥b $ b                },b $ b                "type":"ApiConnection"

            },b $ b            "Execute_stored_procedure":{

                "输入":{

                    "host":{

                        "连接":{

                            " name":" @parameters('$ connections')['sql'] ['connectionId']"

                        }¥b $ b                    },b $ b                    "方法":" post",
                    " path":" / datasets / default / procedures / @ {encodeURIComponent(encodeURIComponent('[dbo]。[CreatePIMSupplementalItemDelta]'))}"
$
                },b $ b                "runAfter":{

                    "Initialize_variable":[

                        "成功"

                    ]¥b $ b                },b $ b                "type":"ApiConnection"

            },b $ b            "FnPIMSupplementalItemTableSync":{

                "输入":{

                    "body":{},

                    "功能":{

                        " id":" /subscriptions/2cd57834-0c84-4e37-9934-6655e90954e6/resourceGroups/dev-integration-rg/providers/Microsoft.Web/sites/PCNAFunctions-dev/functions/FnPIMSupplementalItemTableSync"

                    }¥b $ b                },b $ b                "runAfter":{

                    "条件":[

                        "成功"

                    ]¥b $ b                },b $ b                "type":""""

            },b $ b            "Initialize_variable":{

                "输入":{

                    "变量":[

                        {

                            " name":" count",
                            "type":"Integer","
                            "价值":1

                        }¥b $ b                    ]¥b $ b                },b $ b                "runAfter":{},

                "type":" InitializeVariable"

            }¥b $ b        },b $ b        "contentVersion":" 1.0.0.0",$
        "输出":{},

        "参数":{

            "$ connections":{

                " defaultValue":{},

                "type":" Object"

            }¥b $ b        },b $ b        "触发器":{

            "重复发生":{

                "重复发生":{

                    "频率":"日",

                    "interval":1,

                    "schedule":{

                        "小时":[

                            "20"

                        ],
                        "分钟":[

                            0

                        ]¥b $ b                    },b $ b                    "startTime":" 2018-06-06T15:00:00Z",

                    "timeZone":"东部标准时间"

                },b $ b                "type":"Recurrence"和
            }¥b $ b        }¥b $ b    }

解决方案

如果邮件大小超过允许的最大大小  [最大邮件大小:256服务总线队列中的KB(标准层)/ 1 MB(高级层)]  您必须通过以下方式处理大型Service Bus消息:


- 减少邮件大小:查看不必要的内容并将其从邮件中删除。



-Optimal Serialization:例如,XML序列化不太适合添加到每个属性以及打开和关闭标记的开始和结束标记。

<声明检查模式:通过将消息数据存储在持久存储中并将引用传递给持久数据以及消息来实现模式。接收方负责从之前使用的持久性
存储中检索消息数据,并将其与消息进行协调。


您可以阅读有关声明检查模式的更多信息在下面的文件中。



https://github.com/Azure/azure-service-bus-dotnet-plugins#third-party-provided-plugins


https://github.com/SeanFeldman/ServiceBus.AttachmentPlugin/blob/develop/README .md


https://www.serverless360 .com / blog / deal-with-large-service-bus-messages-using-claim-check-pattern


So i'm working on a logic app that executes a stored proc (output as XML). from there i'm trying to take that data-set and send it to a service bus. the problem is the data set can sometimes be larger than the max size for a service bus. So what i wanted to try to do was get the data-set back from SQL and loop through the records and send them either batches or send them individual.  i'm not sure how to do it. I've tried using a for each using the data-set but the data it is sending its still massive.  Any input or help would be appreciated.  Thank you!

"definition": {
        "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
        "actions": {
            "Compose": {
                "inputs": "@body('Execute_stored_procedure')?['resultsets']?['Table1']",
                "runAfter": {
                    "Execute_stored_procedure": [
                        "Succeeded"
                    ]
                },
                "type": "Compose"
            },
            "Condition": {
                "actions": {},
                "else": {
                    "actions": {
                        "Condition_2": {
                            "actions": {},
                            "else": {
                                "actions": {
                                    "For_each": {
                                        "actions": {
                                            "FnCombineMessages_2": {
                                                "inputs": {
                                                    "body": "@body('Execute_stored_procedure')?['resultsets']?['Table1']",
                                                    "function": {
                                                        "id": "/subscriptions/2cd57834-0c84-4e37-9934-6655e90954e6/resourceGroups/dev-integration-rg/providers/Microsoft.Web/sites/PCNAFunctions-dev/functions/FnCombineMessages"
                                                    }
                                                },
                                                "runAfter": {},
                                                "type": "Function"
                                            },
                                            "Send_message": {
                                                "inputs": {
                                                    "body": {
                                                        "ContentData": "@{base64(body('FnCombineMessages_2'))}"
                                                    },
                                                    "host": {
                                                        "connection": {
                                                            "name": "@parameters('$connections')['servicebus']['connectionId']"
                                                        }
                                                    },
                                                    "method": "post",
                                                    "path": "/@{encodeURIComponent(encodeURIComponent('outboundpcnapimsupplementalitemqueue'))}/messages",
                                                    "queries": {
                                                        "systemProperties": "None"
                                                    }
                                                },
                                                "runAfter": {
                                                    "FnCombineMessages_2": [
                                                        "Succeeded"
                                                    ]
                                                },
                                                "type": "ApiConnection"
                                            }
                                        },
                                        "foreach": "@outputs('Compose')",
                                        "operationOptions": "Sequential",
                                        "runAfter": {},
                                        "type": "Foreach"
                                    }
                                }
                            },
                            "expression": {
                                "and": [
                                    {
                                        "equals": [
                                            "@body('Execute_stored_procedure')?['resultsets']?['Table1']",
                                            "@json('[]')"
                                        ]
                                    }
                                ]
                            },
                            "runAfter": {},
                            "type": "If"
                        }
                    }
                },
                "expression": {
                    "and": [
                        {
                            "equals": [
                                "@body('Execute_stored_procedure')?['resultsets']?['Table1']",
                                0
                            ]
                        }
                    ]
                },
                "runAfter": {
                    "Execute_a_SQL_query": [
                        "Succeeded"
                    ]
                },
                "type": "If"
            },
            "Execute_a_SQL_query": {
                "inputs": {
                    "body": {
                        "query": "Select Count(*) from dbo.Staging_PIMSupplementalItem"
                    },
                    "host": {
                        "connection": {
                            "name": "@parameters('$connections')['sql']['connectionId']"
                        }
                    },
                    "method": "post",
                    "path": "/datasets/default/query/sql"
                },
                "runAfter": {
                    "Compose": [
                        "Succeeded"
                    ]
                },
                "type": "ApiConnection"
            },
            "Execute_stored_procedure": {
                "inputs": {
                    "host": {
                        "connection": {
                            "name": "@parameters('$connections')['sql']['connectionId']"
                        }
                    },
                    "method": "post",
                    "path": "/datasets/default/procedures/@{encodeURIComponent(encodeURIComponent('[dbo].[CreatePIMSupplementalItemDelta]'))}"
                },
                "runAfter": {
                    "Initialize_variable": [
                        "Succeeded"
                    ]
                },
                "type": "ApiConnection"
            },
            "FnPIMSupplementalItemTableSync": {
                "inputs": {
                    "body": {},
                    "function": {
                        "id": "/subscriptions/2cd57834-0c84-4e37-9934-6655e90954e6/resourceGroups/dev-integration-rg/providers/Microsoft.Web/sites/PCNAFunctions-dev/functions/FnPIMSupplementalItemTableSync"
                    }
                },
                "runAfter": {
                    "Condition": [
                        "Succeeded"
                    ]
                },
                "type": "Function"
            },
            "Initialize_variable": {
                "inputs": {
                    "variables": [
                        {
                            "name": "count",
                            "type": "Integer",
                            "value": 1
                        }
                    ]
                },
                "runAfter": {},
                "type": "InitializeVariable"
            }
        },
        "contentVersion": "1.0.0.0",
        "outputs": {},
        "parameters": {
            "$connections": {
                "defaultValue": {},
                "type": "Object"
            }
        },
        "triggers": {
            "Recurrence": {
                "recurrence": {
                    "frequency": "Day",
                    "interval": 1,
                    "schedule": {
                        "hours": [
                            "20"
                        ],
                        "minutes": [
                            0
                        ]
                    },
                    "startTime": "2018-06-06T15:00:00Z",
                    "timeZone": "Eastern Standard Time"
                },
                "type": "Recurrence"
            }
        }
    }

解决方案

If the message size exceeds maximum allowed size [Maximum message size: 256 KB (Standard tier) / 1 MB (Premium tier)] in service bus queue you have to how to handle large Service Bus messages by following ways:

-Reduce the message size: Review anything that is not necessary and remove it from the message.

-Optimal Serialization: for example, the XML serialization is less desirable for its opening and closing tags that add to every attribute and opening and closing tags.

-Using Claim Check Pattern: The pattern is implemented by storing message data in a persistent store and passing a reference to the persisted data along with the message. The receiver is responsible for retrieving message data from the persistence store used earlier and reconciling it with the message.

You can read more about Claim Check Pattern in below documents.

https://github.com/Azure/azure-service-bus-dotnet-plugins#third-party-provided-plugins

https://github.com/SeanFeldman/ServiceBus.AttachmentPlugin/blob/develop/README.md

https://www.serverless360.com/blog/deal-with-large-service-bus-messages-using-claim-check-pattern


这篇关于使用逻辑应用程序执行SQL Stored proc并循环遍历每条记录以发送到服务总线的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆