持久功能中只有扇出(忘了) [英] Only Fan-out (and forget) in Durable Functions

查看:121
本文介绍了持久功能中只有扇出(忘了)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个具有2个功能和一个存储队列的现有功能应用程序. F1由服务总线主题中的消息触发.对于收到的每个味精,F1计算一些子任务(T1,T2,...),这些子任务必须以不同的延迟量执行.例如-T1将在3分钟后触发,T2将在5分钟后触发,等等.F1将消息发布到具有适当可见性超时(以模拟延迟)的存储队列中,并且每当消息在队列中可见时就会触发F2.一切正常.

I have an existing Function App with 2 Functions and a storage queue. F1 is triggered by a message in a service bus topic. For each msg received, F1 calculates a some sub-tasks (T1,T2,...) which have to be executed with varying amount of delay. Ex - T1 to be fired after 3 mins, T2 after 5min etc. F1 posts messages to a storage queue with appropriate visibility timeouts (to simulate the delay) and F2 is triggered whenever a message is visible in the queue. All works fine.

我现在想将该应用程序迁移为使用耐用功能". F1现在仅启动协调器.协调器代码如下-

I now want to migrate this app to use 'Durable Functions'. F1 now only starts the orchestrator. The orchestrator code is something as follows -

    public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
    {
        var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
        List<Task> tasks = new List<Task>();
        foreach (var value in results)
        {
            var pnTask = context.CallActivityAsync("PerformSubTask", value);
            tasks.Add(pnTask);
        }

        //dont't await as we want to fire and forget. No fan-in!
        //await Task.WhenAll(tasks);
    }

    [FunctionName("PerformSubTask")]
    public async static Task Run([ActivityTrigger]TaskInfo info, TraceWriter log)
    {
         TimeSpan timeDifference = DateTime.UtcNow - info.Origin.ToUniversalTime();
         TimeSpan delay = TimeSpan.FromSeconds(info.DelayInSeconds);
         var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

         //will still keep the activity function running and incur costs??
         await Task.Delay(actualDelay);

         //perform subtask work after delay! 
    }

我只想扇出(不扇入收集结果)并启动子任务.协调器启动所有任务,并避免调用"await Task.WhenAll".活动函数调用"Task.Delay"以等待指定的时间,然后执行其工作.

I would only like to fan-out (no fan-in to collect the results) and start the subtasks. The orchestrator starts all the tasks and avoids call 'await Task.WhenAll'. The activity function calls 'Task.Delay' to wait for the specified amount of time and then does its work.

我的问题

  • 为此工作流程使用耐用功能有意义吗?
  • 这是编排"Fan-out"工作流程的正确方法吗?
  • 我不喜欢活动功能在指定的时间(3或5分钟)内不执行任何操作的事实.会产生费用吗?
  • 此外,如果需要超过10分钟的延迟,则存在 ="noreferrer">无法使活动功能成功!
  • 我先前为避免这种情况的尝试是在业务流程管理器中使用"CreateTimer",然后将活动添加为延续,但我在历史记录"表中仅看到计时器条目.继续不触发!我违反了协调器代码的约束-协调器代码一定不能启动任何异步操作"?

  • Does it make sense to use Durable Functions for this workflow?
  • Is this the right approach to orchestrate 'Fan-out' workflow?
  • I do not like the fact that the activity function is running for specified amount of time (3 or 5 mins) doing nothing. It will incurs costs,or?
  • Also if a delay of more than 10 minutes is required there is no way for an activity function to succeed with this approach!
  • My earlier attempt to avoid this was to use 'CreateTimer' in the orchestrator and then add the activity as a continuation, but I see only timer entries in the 'History' table. The continuation does not fire! Am I violating the constraint for orchestrator code - 'Orchestrator code must never initiate any async operation' ?

foreach (var value in results)
{
        //calculate time to start
        var timeToStart = ;
        var pnTask = context.CreateTimer(timeToStart , CancellationToken.None).ContinueWith(t => context.CallActivityAsync("PerformSubTask", value));
        tasks.Add(pnTask);
}

更新:使用克里斯建议的方法

UPDATE : using approach suggested by Chris

用于计算子任务和延迟的活动

Activity that calculates subtasks and delays

[FunctionName("CalculateTasks")]
public static List<TaskInfo> CalculateTasks([ActivityTrigger]string input,TraceWriter log)
{
    //in reality time is obtained by calling an endpoint 
    DateTime currentTime = DateTime.UtcNow;
    return new List<TaskInfo> {
        new TaskInfo{ DelayInSeconds = 10, Origin = currentTime },
        new TaskInfo{ DelayInSeconds = 20, Origin = currentTime },
        new TaskInfo{ DelayInSeconds = 30, Origin = currentTime },
    };
}

public static async Task Orchestrator([OrchestrationTrigger] DurableOrchestrationContext context, TraceWriter log)
{
    var results = await context.CallActivityAsync<List<TaskInfo>>("CalculateTasks", "someinput");
    var currentTime = context.CurrentUtcDateTime;
    List<Task> tasks = new List<Task>();
    foreach (var value in results)
    {
        TimeSpan timeDifference = currentTime - value.Origin;
        TimeSpan delay = TimeSpan.FromSeconds(value.DelayInSeconds);
        var actualDelay = timeDifference > delay ? TimeSpan.Zero : delay - timeDifference;

        var timeToStart = currentTime.Add(actualDelay);

        Task delayedActivityCall = context
             .CreateTimer(timeToStart, CancellationToken.None)
             .ContinueWith(t => context.CallActivityAsync("PerformSubtask", value));
        tasks.Add(delayedActivityCall);
    }

    await Task.WhenAll(tasks);
}

仅从协调器内部调度任务似乎可以正常工作.就我而言,我正在计算任务和循环之前另一个活动(CalculateTasks)中的延迟.我希望使用活动运行时的当前时间"来计算延迟.我在活动中使用DateTime.UtcNow.在协调器中使用时,这种方式不能很好地发挥作用. "ContinueWith"指定的活动不会运行,并且协调器始终处于正在运行"状态.

Simply scheduling tasks from within the orchestrator seems to work.In my case I am calculating the tasks and the delays in another activity (CalculateTasks) before the loop. I want the delays to be calculated using the 'current time' when the activity was run. I am using DateTime.UtcNow in the activity. This somehow does not play well when used in the orchestrator. The activities specified by 'ContinueWith' just don't run and the orchestrator is always in 'Running' state.

我不能使用协调器中某项活动记录的时间吗?

更新2

因此,克里斯建议的解决方法起作用了!

So the workaround suggested by Chris works!

由于我不想收集活动的结果,因此避免在安排所有活动之后调用"await Tasks.WhenAll(tasks)".我这样做是为了减少控制队列上的争用,即如果需要可以启动另一个业务流程.但是,直到所有活动结束运行之前,协调器"的状态仍为"正在运行".我猜只有在上一个活动向控制队列中发布完成"消息之后,它才会移动到"完成".

Since I do not want to collect the results of the activities I avoid calling 'await Tasks.WhenAll(tasks)' after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.

我对吗?有没有办法在安排所有活动之后更早释放协调器?

Am I right? Is there any way to free the orchestrator earlier i.e right after scheduling all activities?

推荐答案

ContinueWith方法对我来说很好.我可以使用以下协调器代码模拟您的方案的版本:

The ContinueWith approach worked fine for me. I was able to simulate a version of your scenario using the following orchestrator code:

[FunctionName("Orchestrator")]
public static async Task Orchestrator(
    [OrchestrationTrigger] DurableOrchestrationContext context,
    TraceWriter log)
{
    var tasks = new List<Task>(10);
    for (int i = 0; i < 10; i++)
    {
        int j = i;
        DateTime timeToStart = context.CurrentUtcDateTime.AddSeconds(10 * j);
        Task delayedActivityCall = context
            .CreateTimer(timeToStart, CancellationToken.None)
            .ContinueWith(t => context.CallActivityAsync("PerformSubtask", j));
        tasks.Add(delayedActivityCall);
    }

    await Task.WhenAll(tasks);
}

这是值得的,这里是活动功能代码.

And for what it's worth, here is the activity function code.

[FunctionName("PerformSubtask")]
public static void Activity([ActivityTrigger] int j, TraceWriter log)
{
    log.Warning($"{DateTime.Now:o}: {j:00}");
}

从日志输出中,我看到所有活动调用彼此之间相距10秒.

From the log output, I saw that all activity invocations ran 10 seconds apart from each other.

另一种方法是散布到多个子业务流程(如建议的@jeffhollan),这些子流程很简单,只是短暂的持续时间延迟和您的活动调用.

Another approach would be to fan out to multiple sub-orchestrations (like @jeffhollan suggested) which are simple a short sequence of a durable timer delay and your activity call.

更新 我尝试使用您更新的示例,并能够重现您的问题!如果您在Visual Studio中本地运行并将异常设置配置为始终在异常时中断,则应该看到以下内容:

UPDATE I tried using your updated sample and was able to reproduce your problem! If you run locally in Visual Studio and configure the exception settings to always break on exceptions, then you should see the following:

System.InvalidOperationException :'检测到多线程执行.如果协调器功能代码正在等待不是由DurableOrchestrationContext方法创建的任务,则会发生这种情况.可以在本文中找到更多详细信息 https://docs.microsoft.com/zh-cn/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints ."

System.InvalidOperationException: 'Multithreaded execution was detected. This can happen if the orchestrator function code awaits on a task that was not created by a DurableOrchestrationContext method. More details can be found in this article https://docs.microsoft.com/en-us/azure/azure-functions/durable-functions-checkpointing-and-replay#orchestrator-code-constraints.'

这意味着称为context.CallActivityAsync("PerformSubtask", j)的线程与称为Orchestrator函数的线程不同相同.我不知道为什么我的最初示例没有做到这一点,或者为什么您的版本没有做到这一点.它与TPL如何决定使用哪个线程来运行ContinueWith委托有关-我需要进一步研究.

This means the thread which called context.CallActivityAsync("PerformSubtask", j) was not the same as the thread which called the orchestrator function. I don't know why my initial example didn't hit this, or why your version did. It has something to do with how the TPL decides which thread to use to run your ContinueWith delegate - something I need to look more into.

好消息是,有一个简单的解决方法,即指定

The good news is that there is a simple workaround, which is to specify TaskContinuationOptions.ExecuteSynchronously, like this:

Task delayedActivityCall = context
    .CreateTimer(timeToStart, CancellationToken.None)
    .ContinueWith(
        t => context.CallActivityAsync("PerformSubtask", j),
        TaskContinuationOptions.ExecuteSynchronously);

请尝试一下,让我知道是否可以解决您所观察到的问题.

Please try that and let me know if that fixes the issue you're observing.

理想情况下,使用Task.ContinueWith时不需要执行此解决方法.我已经在GitHub中打开了一个问题来跟踪此问题: https://github .com/Azure/azure-functions-durable-extension/issues/317

Ideally you wouldn't need to do this workaround when using Task.ContinueWith. I've opened an issue in GitHub to track this: https://github.com/Azure/azure-functions-durable-extension/issues/317

由于我不想收集活动的结果,因此避免在安排所有活动之后调用await Tasks.WhenAll(tasks).我这样做是为了减少控制队列上的争用,即如果需要可以启动另一个业务流程.但是,直到所有活动结束运行之前,协调器"的状态仍为正在运行".我猜只有在上一个活动向控制队列中发布完成"消息后,它才会移至完成".

Since I do not want to collect the results of the activities I avoid calling await Tasks.WhenAll(tasks) after scheduling all activities. I do this in order to reduce the contention on the control queue i.e. be able to start another orchestration if reqd. Nevertheless the status of the 'orchestrator' is still 'Running' till all the activities finish running. I guess it moves to 'Complete' only after the last activity posts a 'done' message to the control queue.

这是预期的.在所有出色的持久性任务都完成之前,Orchestrator的功能永远不会真正完成.没有任何方法可以解决此问题.请注意,您仍然可以启动其他Orchestrator实例,如果它们恰好位于同一分区(默认情况下有4个分区),则可能会有争用.

This is expected. Orchestrator functions never actually complete until all outstanding durable tasks have completed. There isn't any way to work around this. Note that you can still start other orchestrator instances, there just might be some contention if they happen to land on the same partition (there are 4 partitions by default).

这篇关于持久功能中只有扇出(忘了)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆