如何正确使用F#中的TryScan [英] How to use TryScan in F# properly

查看:160
本文介绍了如何正确使用F#中的TryScan的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图找到一个关于如何使用 TryScan 的示例,但是没有发现任何问题,你能帮我吗?

I was trying to find an example about how to use TryScan, but haven't found any, could you help me?

我想做什么(很简单的例子):我有一个 MailboxProcessor 接受
两种类型的条件。

What I would like to do (quite simplified example): I have a MailboxProcessor that accepts two types of mesages.


  • 第一个 GetState 返回当前状态。
    GetState 消息发送频繁

  • First one GetState returns current state. GetState messages are sent quite frequently

其他 UpdateState 非常昂贵(耗时) - 例如从互联网下载某物,然后相应地更新状态。
UpdateState 很少调用。

The other UpdateState is very expensive (time consuming) - e.g. downloading something from internet and then updates the state accordingly. UpdateState is called only rarely.

我的问题是 - messages GetState 直到提供 UpdateState 之前。这就是为什么我试图使用 TryScan 来处理所有 GetState 消息,但没有运气。

My problem is - messages GetState are blocked and wait until preceding UpdateState are served. That's why I tried to use TryScan to process all GetState messages, but with no luck.

我的示例代码:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan"
                        chnl.Reply(state)
                        Some(async { return! loop state})
                    | _ -> None
                ) |> ignore

                let! msg = mbox.Receive()
                match msg with
                | UpdateState ->
                    printfn "U processing"
                    // something very time consuming here...
                    async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                    return! loop (state+1)
                | GetState(chnl) ->
                    printfn "G processing"
                    chnl.Reply(state)
                    return! loop state
             }
             loop 0
)

[async { for i in 1..10 do 
          printfn " U"
          mbox.Post(UpdateState)
          async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
        async { do! Async.Sleep(500) } |> Async.RunSynchronously
        for i in 1..20 do 
          printfn "G"
          printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously



如果尝试运行代码,您将看到 GetState 消息几乎不会被处理,因为它等待结果。另一方面, UpdateState 只是fire-and-forget,因此阻塞有效获取状态。

If you try to run the code, you will see, that GetState message is not almost processed, because it waits for the result. On the other hand UpdateState is only fire-and-forget, thus blocking effectively getting state.

修改

适用于我的当前解决方案是:

Current solution that works for me is this one:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                let! res = mbox.TryScan((function
                    | GetState(chnl) -> Some(async {
                            chnl.Reply(state)
                            return state
                        })
                    | _ -> None
                ), 5)

                match res with
                | None ->
                    let! msg = mbox.Receive()
                    match msg with
                        | UpdateState ->
                            async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                            return! loop (state+1)
                        | _ -> return! loop state
                | Some n -> return! loop n
             }
             loop 0
)






对注释的反应:与其他 MailboxProcessor ThreadPool 的执行 UpdateState 并行是伟大的,但我目前不需要它。
我只想处理所有的 GetState 消息,其他的。我不在乎在处理期间 UpdateState 代理被阻止。


Reactions to comments: the idea with other MailboxProcessor or ThreadPool that executes UpdateState in parallel is great, but I don't need it currently. All I wanted to do is to process all GetState messages and after that the others. I don't care that during processing UpdateState the agent is blocked.

我会告诉你什么是输出上的问题:

I'll show you what was the problem on the output:

// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
 U            200ms   <-- issue UpdateState
U processing          <-- process UpdateState, it takes 1sec, so other 
 U            200ms       5 requests are sent; sent means, that it is
 U            200ms       fire-and-forget message - it doesn't wait for any result
                          and therefore it can send every 200ms one UpdateState message
G                     <-- first GetState sent, but waiting for reply - so all 
                          previous UpdateState messages have to be processed! = 3 seconds
                          and AFTER all the UpdateState messages are processed, result
                          is returned and new GetState can be sent. 
 U            200ms
 U            200ms       because each UpdateState takes 1 second
 U            200ms
U processing
 U
 U
 U
 U
U processing
G processing          <-- now first GetState is processed! so late? uh..
U processing          <-- takes 1sec
3
G
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
G processing          <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent


推荐答案

code> TryScan 方法将在这种情况下帮助您。它允许您指定在等待消息时使用的超时。收到一些消息后,它将开始处理消息(忽略超时)。

I don't think that the TryScan method will help you in this scenario. It allows you to specify timeout to be used while waiting for messages. Once some message is received, it will start processing the message (ignoring the timeout).

例如,如果您想等待某个特定消息,检查每秒(在等待时)你可以写:

For example, if you wanted to wait for some specific message, but perform some other checking every second (while waiting) you could write:

let loop () = async {
  let! res = mbox.TryScan(function
    | ImportantMessage -> Some(async { 
          // process message 
          return 0
        })
    | _ -> None)
  match res with
  | None -> 
       // perform some check & continue waiting
       return! loop ()
  | Some n -> 
       // ImportantMessage was received and processed 
}

避免在处理 UpdateState 消息时阻止邮箱处理器?邮箱处理器是(逻辑上)单线程的 - 你可能不想取消 UpdateState 消息的处理,所以最好的选择是在后台开始处理它,等待处理完成。处理 UpdateState 的代码然后可以将一些消息发送回邮箱(例如 UpdateStateCompleted )。

What can you do to avoid blocking the mailbox processor when processing the UpdateState message? The mailbox processor is (logically) single-threaded - you probably don't want to cancel the processing of UpdateState message, so the best option is to start processing it in background and wait until the processing completes. The code that processes UpdateState can then send some message back to the mailbox (e.g. UpdateStateCompleted).

这是一个草图如何看起来:

Here is a sketch how this might look:

let rec loop (state) = async {
  let! msg = mbox.Receive()
  match msg with
  | GetState(repl) -> 
      repl.Reply(state)
      return! scanning state
  | UpdateState -> 
      async { 
        // complex calculation (runs in parallel)
        mbox.Post(UpdateStateCompleted newState) }
      |> Async.Start
  | UpdateStateCompleted newState ->
      // Received new state from background workflow
      return! loop newState }

现在后台任务并行运行,您需要小心可变状态。此外,如果您发送 UpdateState 消息的速度比处理它们快,您会遇到麻烦。这可以是固定的,例如,当您已经处理了上一个请求时,忽略或排队请求。

Now that the background task is running in parallel, you need to be careful about mutable state. Also, if you send UpdateState messages faster than you can process them, you'll be in trouble. This can be fixed, for example, by ignoring or queueing requests when you're already processing previous one.

这篇关于如何正确使用F#中的TryScan的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆