根据调度程序将 async-await C# 代码转换为 F# [英] Translating async-await C# code to F# with respect to the scheduler

查看:19
本文介绍了根据调度程序将 async-await C# 代码转换为 F#的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道这是否是一个过于宽泛的问题,但最近我让自己遇到了一段代码,我想确定如何从 C# 转换为正确的 F#.旅程从这里 (1) 开始(TPL-F# 交互的原始问题),以及继续这里 (2) (我正在考虑将一些示例代码翻译成 F#).

I wonder if this is too a broad question, but recently I made myself to come across a piece of code I'd like to be certain on how to translate from C# into proper F#. The journey starts from here (1) (the original problem with TPL-F# interaction), and continues here (2) (some example code I'm contemplating to translate into F#).

示例代码太长,无法在此重现,但有趣的功能是ActivateAsyncRefreshHubsAddHub.特别有趣的地方是

The example code is too long to reproduce here, but the interesting functions are ActivateAsync, RefreshHubs and AddHub. Particularly the interesting points are

  1. AddHub 有一个private async Task AddHub(string address)的签名.
  2. RefreshHubs 在循环中调用 AddHub 并收集一个 tasks 列表,然后在最后通过 await 等待Task.WhenAll(tasks),因此返回值与其 private async Task RefreshHubs(object _) 的签名相匹配.
  3. RefreshHubsActivateAsync 调用,就像 await RefreshHubs(null) 一样,然后最后有一个调用 await base.ActivateAsync() 匹配函数签名public override async Task ActivateAsync().
  1. AddHub has a signature of private async Task AddHub(string address).
  2. RefreshHubs calls AddHub in a loop and collects a list of tasks, which it then awaits in the very end by await Task.WhenAll(tasks) and consequently the return value matches its signature of private async Task RefreshHubs(object _).
  3. RefreshHubs is called by ActivateAsync just as await RefreshHubs(null) and then in the end there's a call await base.ActivateAsync() matching the function signature public override async Task ActivateAsync().

问题:

这样的函数签名到 F# 的正确转换是什么,它仍然保持界面和功能并尊重默认的自定义调度程序?而且我也不太确定这种F# 中的异步/等待".至于如何机械地"做到这一点.:)

What would be the correct translation of such function signatures to F# that still maintains the interface and functionality and respects the default, custom scheduler? And I'm not otherwise too sure of this "async/await in F#" either. As in how to do it "mechanically". :)

原因是在链接here (1)"中似乎存在问题(我尚未验证这一点),因为 F# 异步操作不尊重(奥尔良)运行时设置的自定义协作调度程序.此外,here 指出 TPL 操作逃脱了scheduler 并进入任务池,因此禁止使用它们.

The reason is that in the link "here (1)" there seem to be problem (I haven't verified this) in that F# async operations do not respect a custom, cooperative scheduler set by the (Orleans) runtime. Also, it's stated here that TPL operations escape the scheduler and go to the task pool and their use is therefore prohibited.

我能想到的一种处理方法是使用 F# 函数,如下所示

One way I can think of dealing with this is with a F# function as follows

//Sorry for the inconvenience of shorterned code, for context see the link "here (1)"...
override this.ActivateAsync() =
    this.RegisterTimer(new Func<obj, Task>(this.FlushQueue), null, TimeSpan.FromMilliseconds(100.0), TimeSpan.FromMilliseconds(100.0)) |> ignore

    if RoleEnvironment.IsAvailable then
        this.RefreshHubs(null) |> Async.awaitPlainTask |> Async.RunSynchronously
    else
        this.AddHub("http://localhost:48777/") |> Async.awaitPlainTask |> Async.RunSynchronously

    //Return value comes from here.
    base.ActivateAsync()

member private this.RefreshHubs(_) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //The return value is Task.
    //In the C# version the AddHub provided tasks are collected and then the
    //on the last line there is return await Task.WhenAll(newHubAdditionTasks) 
    newHubs |> Array.map(fun i -> this.AddHub(i)) |> Task.WhenAll

member private this.AddHub(address) =
    //Code omitted, in case mor context is needed, take a look at the link "here (2)", sorry for the inconvinience...
    //In the C# version:
    //...
    //hubs.Add(address, new Tuple<HubConnection, IHubProxy>(hubConnection, hub))
    //} 
    //so this is "void" and could perhaps be Async<void> in F#... 
    //The return value is Task.
    hubConnection.Start() |> Async.awaitTaskVoid |> Async.RunSynchronously
    TaskDone.Done

startAsPlainTask 函数由 Sacha Barber 提供,来自 此处.另一个有趣的选择可能是这里作为

The startAsPlainTask function is by Sacha Barber from here. Another interesting option could be here as

module Async =
    let AwaitTaskVoid : (Task -> Async<unit>) =
        Async.AwaitIAsyncResult >> Async.Ignore

<edit: 我刚刚注意到 Task.WhenAll 也需要等待.但正确的方法是什么?呃,该睡觉了(一个糟糕的双关语)...

<edit: I just noticed the Task.WhenAll would need to be awaited too. But what would be the proper way? Uh, time to sleep (a bad pun)...

<edit 2:这里 (1)(TPL-F# 交互的原始问题)在 Codeplex 中提到 F# 使用同步上下文将工作推送到线程,而 TPL 没有.现在,这是一个合理的解释,我觉得(尽管无论自定义调度程序如何,我在正确翻译这些片段时仍然存在问题).一些有趣的附加信息可能来自

<edit 2: At here (1) (the original problem with TPL-F# interaction) in Codeplex it was mentioned that F# uses synchronization contexts to push work to threads, whereas TPL does not. Now, this is a plausible explanation, I feel (although I'd still have problems in translating these snippets properly regardless of the custom scheduler). Some interesting additional information could be to had from

  • How to get a Task that uses SynchronizationContext? And how are SynchronizationContext used anyway?
  • Await, SynchronizationContext, and Console Apps wherein an example SingleThreadSynchronizationContext is provided that looks like queues the work to be executed. Maybe this ought to be used?

在这种情况下,我想我需要提及 Hopac,作为一个有趣的切线,并且还提到我在接下来的 50 多个小时左右,我都无法访问,以防我所有的交叉发布都失控了.

I think I need to mention Hopac in this context, as an interesting tangential and also mention I'm out of reach for the next 50 odd hours or so in case all my cross-postings go out of hand.

<edit 3:丹尼尔svick 在评论中给出了使用自定义任务构建器的好建议.Daniel 提供了一个链接,该链接已在 FSharpx.

<edit 3: Daniel and svick give good advice in the comments to use a custom task builder. Daniel provides a link to a one that's already defined in FSharpx.

看源码我看到接口带参数都定义为

Looking at the the source I see the interface with the parameters are defined as

type TaskBuilder(?continuationOptions, ?scheduler, ?cancellationToken) =
    let contOptions = defaultArg continuationOptions TaskContinuationOptions.None
    let scheduler = defaultArg scheduler TaskScheduler.Default
    let cancellationToken = defaultArg cancellationToken CancellationToken.None

如果要在 Orleans 中使用它,看起来 TaskScheduler 应该是 TaskScheduler.Current 根据文档 这里

If one were to use this in Orleans, it looks like the TaskScheduler ought to be TaskScheduler.Current as per documentation here

Orleans 有它自己的任务调度器,它提供单线程在grain中使用的执行模型.跑步的时候很重要使用 Orleans 调度程序的任务,而不是 .NET 线程池.

Orleans has it's own task scheduler which provides the single threaded execution model used within grains. It's important that when running tasks the Orleans scheduler is used, and not the .NET thread pool.

如果你的grain代码需要创建一个子任务,你应该使用Task.Factory.StartNew:

Should your grain code require a subtask to be created, you should use Task.Factory.StartNew:

await Task.Factory.StartNew(() =>{/* 逻辑 */});

await Task.Factory.StartNew(() =>{ /* logic */ });

此技术将使用当前的任务调度程序,即奥尔良调度程序.

This technique will use the current task scheduler, which will be the Orleans scheduler.

你应该避免使用 Task.Run,​​它总是使用 .NET 线程池,因此不会在单线程执行中运行模型.

You should avoid using Task.Run, which always uses the .NET thread pool, and therefore will not run in the single-threaded execution model.

看起来 TaskScheduler.CurrentTaskScheduler.Default.也许这需要提出一个问题,即在哪些示例情况下会出现不希望有的差异.正如奥尔良文档指出不要使用 Task.Run 而是使用 Task.Factory.StartNew 的指南,我想知道是否应该定义 TaskCreationOptions.DenyAttachChild 推荐的Stephen Toub 等权威机构>Task.Run 与 Task.Factory.StartNewStephen ClearyStartNew 是危险的.嗯,看起来 .Default 将是 .DenyAttachChilld 除非我弄错了.

It looks there's a subtle difference between TaskScheduler.Current and TaskScheduler.Default. Maybe this warrants a question on in which example cases there'll be an undesired difference. As the Orleans documentation points out not to use Task.Run and instead guides to Task.Factory.StartNew, I wonder if one ought to define TaskCreationOptions.DenyAttachChild as is recommended by such authorities as Stephen Toub at Task.Run vs Task.Factory.StartNew and Stephen Cleary at StartNew is Dangerous. Hmm, it looks like the .Default will be .DenyAttachChilld unless I'm mistaken.

此外,由于 Task.Run viz Task.Factory.CreateNew 关于自定义调度程序存在问题,我想知道是否可以通过使用来消除这个特定问题自定义 TaskFactory任务计划程序(任务.Factory) 并控制线程数如何:创建限制并发的任务计划程序.

Moreover, as there is a problem with Task.Run viz Task.Factory.CreateNew regarding the custom scheduler, I wonder if this particular problem could be removed by using a custom TaskFactory as explained in Task Scheduler (Task.Factory) and controlling the number of threads and How to: Create a Task Scheduler That Limits Concurrency.

嗯,这已经变成了一个相当长的思考".我想知道我应该如何关闭这个?也许如果 svickDaniel 可以将他们的评论作为答案,我会赞成并接受 svick 的?

Hmm, this is becoming quite a long "pondering" already. I wonder how should I close this? Maybe if svick and Daniel could make their comments as answers and I'd upvote both and accept svick's?

推荐答案

您可以在 FSharpx 中使用 TaskBuilder 并传入 TaskScheduler.Current.这是我尝试翻译 RefreshHubs.请注意,Task 用于代替 Task.

You can use use TaskBuilder in FSharpx and pass in TaskScheduler.Current. Here's my attempt at translating RefreshHubs. Note that Task<unit> is used in lieu of Task.

let RefreshHubs _ =
    let task = TaskBuilder(scheduler = TaskScheduler.Current)
    task {
        let addresses = 
            RoleEnvironment.Roles.["GPSTracker.Web"].Instances
            |> Seq.map (fun instance ->
                let endpoint = instance.InstanceEndpoints.["InternalSignalR"]
                sprintf "http://%O" endpoint.IPEndpoint
            )
            |> Seq.toList

        let newHubs = addresses |> List.filter (not << hubs.ContainsKey)
        let deadHubs = hubs.Keys |> Seq.filter (fun x -> 
            not (List.exists ((=) x) addresses))

        // remove dead hubs
        deadHubs |> Seq.iter (hubs.Remove >> ignore)

        // add new hubs
        let! _ = Task.WhenAll [| for hub in newHubs -> AddHub hub |]
        return ()
    }

这篇关于根据调度程序将 async-await C# 代码转换为 F#的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆