控制可能在F#中发生冲突的异步操作 [英] Controlling async actions that might conflict in F#

查看:71
本文介绍了控制可能在F#中发生冲突的异步操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有许多要在F#中执行的动作(Async<T> list).我可以并行执行这些操作中的大多数 ,但是由于文件锁定等原因,有些操作可能会发生冲突.

I have many actions (Async<T> list) to perform in F#. I can execute most of these actions in parallel, but some might conflict due to file locks etc.

对于每个动作,我可以生成一个键"(int),该键确定动作可能是否冲突:

For each action, I can generate a "key" (int) that determines if the actions might conflict:

  • 如果操作a具有键i,而操作b具有键ji = j,则ab可能会冲突.它们必须顺序执行.
  • 如果操作a具有键i,而操作b具有键ji <> j,则ab将永远不会发生冲突.它们可以并行执行.
  • If action a has key i and action b has key j and i = j, then a and b might conflict. They must be executed serially.
  • If action a has key i and action b has key j and i <> j, then a and b will never conflict. They may be executed in parallel.

我想以一种有效的方式执行我的动作(int * Async<T>) list且没有冲突.

I would like to execute my actions (int * Async<T>) list in an efficient way and without conflicts.

我认为该过程将类似于:

I imagine that the process would be something like:

  • 按键将所有操作分组
  • 将每个组依次分成一个Async
  • 并行运行每个链条

如何在F#中实现呢?

How can I implement this in F#?

通常如何处理这些问题?

How are these problems usually handled?

我尝试完全按顺序执行:

My attempt at a fully sequential implementation:

let wrapTasks<'T> (tasks : (int * Async<'T>) list) : Async<'T list> = async {
  return
    tasks 
    |> Seq.map (fun (k, t) -> t |> Async.RunSynchronously)
    |> Seq.toList
}

推荐答案

使用帮助程序函数对值x进行承诺",对一组值acc进行承诺":

With a helper function taking a 'promise' for a value x and one for a set of values acc:

module Async =
    let sequence x acc = async {
        let! x = x
        let! y = acc
        return x :: y
    }

我们可以通过tasks的锁定ID"异步分组它们,稍微整理一下结果列表,然后将sequence每个组分为一个单独的async,该"c22"包含"该组结果的列表.然后并行处理此列表. ts : 'b list []可用后,我们将其展平:

we can asynchronously group the tasks by their 'lock id', clean up the resulting list a bit and then sequence each group into a single async that 'contains' the list of the results of its group. This list is then processed in parallel. Once ts : 'b list [] is available, we flatten it:

let wrapTasks tasks = async {
    let! ts =
        tasks
        |> List.groupBy fst
        |> List.map (snd >> List.map snd)
        |> List.map (fun asyncs -> List.foldBack Async.sequence asyncs (async { return [] }))
        |> Async.Parallel
    return ts |> List.ofArray |> List.collect id
}

可以通过例如

List.init 50 (fun i -> i % 5, async {
    let now = System.DateTime.UtcNow.Ticks
    do! Async.Sleep 10
    return i, now })
|> wrapTasks
|> Async.RunSynchronously
|> List.groupBy snd
|> List.map (fun (t, rs) -> t, rs |> List.map fst)
|> List.sort

通过改变除数,我们可以调整并行度,并使自己确信该功能按预期运行:-)

By varying the divisor we can adjust the level of parallelism and convince ourselves that the function works as expected :-)

  [(636766393199727614L, [0; 1; 2; 3; 4]);
   (636766393199962986L, [5; 6; 7; 8; 9]);
   (636766393200068008L, [10; 11; 12; 13; 14]);
   (636766393200278385L, [15; 16; 17; 18; 19]);
   (636766393200382690L, [20; 21; 22; 23; 24]);
   (636766393200597692L, [25; 26; 27; 28; 29]);
   (636766393200703235L, [30; 31; 32; 33; 34]);
   (636766393200918241L, [35; 36; 37; 38; 39]);
   (636766393201027938L, [40; 41; 42; 43; 44]);
   (636766393201133307L, [45; 46; 47; 48; 49])]

全部披露:为了获得这个不错的结果,我不得不执行了几次测试.通常数字会有点小.

这篇关于控制可能在F#中发生冲突的异步操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆