如何正确使用F#中的TryScan [英] How to use TryScan in F# properly
问题描述
我试图找到一个关于如何使用 TryScan
的示例,但是没有发现任何问题,你能帮我吗?
I was trying to find an example about how to use TryScan
, but haven't found any, could you help me?
我想做什么(很简单的例子):我有一个 MailboxProcessor
接受
两种类型的条件。
What I would like to do (quite simplified example): I have a MailboxProcessor
that accepts
two types of mesages.
-
第一个
GetState
返回当前状态。
GetState
消息发送频繁
First one
GetState
returns current state.GetState
messages are sent quite frequently
其他 UpdateState
非常昂贵(耗时) - 例如从互联网下载某物,然后相应地更新状态。
UpdateState
很少调用。
The other UpdateState
is very expensive (time consuming) - e.g. downloading something from internet and then updates the state accordingly.
UpdateState
is called only rarely.
我的问题是 - messages GetState
直到提供 UpdateState
之前。这就是为什么我试图使用 TryScan
来处理所有 GetState
消息,但没有运气。
My problem is - messages GetState
are blocked and wait until preceding UpdateState
are served. That's why I tried to use TryScan
to process all GetState
messages, but with no luck.
我的示例代码:
type Msg = GetState of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
let rec loop state = async {
// this TryScan doesn't work as expected
// it should process GetState messages and then continue
mbox.TryScan(fun m ->
match m with
| GetState(chnl) ->
printfn "G processing TryScan"
chnl.Reply(state)
Some(async { return! loop state})
| _ -> None
) |> ignore
let! msg = mbox.Receive()
match msg with
| UpdateState ->
printfn "U processing"
// something very time consuming here...
async { do! Async.Sleep(1000) } |> Async.RunSynchronously
return! loop (state+1)
| GetState(chnl) ->
printfn "G processing"
chnl.Reply(state)
return! loop state
}
loop 0
)
[async { for i in 1..10 do
printfn " U"
mbox.Post(UpdateState)
async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
async { do! Async.Sleep(500) } |> Async.RunSynchronously
for i in 1..20 do
printfn "G"
printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously
如果尝试运行代码,您将看到 GetState
消息几乎不会被处理,因为它等待结果。另一方面, UpdateState
只是fire-and-forget,因此阻塞有效获取状态。
If you try to run the code, you will see, that GetState
message is not almost processed, because it waits for the result. On the other hand UpdateState
is only fire-and-forget, thus blocking effectively getting state.
修改
适用于我的当前解决方案是:
Current solution that works for me is this one:
type Msg = GetState of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
let rec loop state = async {
// this TryScan doesn't work as expected
// it should process GetState messages and then continue
let! res = mbox.TryScan((function
| GetState(chnl) -> Some(async {
chnl.Reply(state)
return state
})
| _ -> None
), 5)
match res with
| None ->
let! msg = mbox.Receive()
match msg with
| UpdateState ->
async { do! Async.Sleep(1000) } |> Async.RunSynchronously
return! loop (state+1)
| _ -> return! loop state
| Some n -> return! loop n
}
loop 0
)
对注释的反应:与其他 MailboxProcessor
或 ThreadPool
的执行 UpdateState
并行是伟大的,但我目前不需要它。
我只想处理所有的 GetState
消息,其他的。我不在乎在处理期间 UpdateState
代理被阻止。
Reactions to comments: the idea with other MailboxProcessor
or ThreadPool
that executes UpdateState
in parallel is great, but I don't need it currently.
All I wanted to do is to process all GetState
messages and after that the others. I don't care that during processing UpdateState
the agent is blocked.
我会告诉你什么是输出上的问题:
I'll show you what was the problem on the output:
// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
U 200ms <-- issue UpdateState
U processing <-- process UpdateState, it takes 1sec, so other
U 200ms 5 requests are sent; sent means, that it is
U 200ms fire-and-forget message - it doesn't wait for any result
and therefore it can send every 200ms one UpdateState message
G <-- first GetState sent, but waiting for reply - so all
previous UpdateState messages have to be processed! = 3 seconds
and AFTER all the UpdateState messages are processed, result
is returned and new GetState can be sent.
U 200ms
U 200ms because each UpdateState takes 1 second
U 200ms
U processing
U
U
U
U
U processing
G processing <-- now first GetState is processed! so late? uh..
U processing <-- takes 1sec
3
G
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
U processing <-- takes 1sec
G processing <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because
// there is no UpdateState message in the queue, neither it is sent
推荐答案
code> TryScan 方法将在这种情况下帮助您。它允许您指定在等待消息时使用的超时。收到一些消息后,它将开始处理消息(忽略超时)。
I don't think that the TryScan
method will help you in this scenario. It allows you to specify timeout to be used while waiting for messages. Once some message is received, it will start processing the message (ignoring the timeout).
例如,如果您想等待某个特定消息,检查每秒(在等待时)你可以写:
For example, if you wanted to wait for some specific message, but perform some other checking every second (while waiting) you could write:
let loop () = async {
let! res = mbox.TryScan(function
| ImportantMessage -> Some(async {
// process message
return 0
})
| _ -> None)
match res with
| None ->
// perform some check & continue waiting
return! loop ()
| Some n ->
// ImportantMessage was received and processed
}
避免在处理 UpdateState
消息时阻止邮箱处理器?邮箱处理器是(逻辑上)单线程的 - 你可能不想取消 UpdateState
消息的处理,所以最好的选择是在后台开始处理它,等待处理完成。处理 UpdateState
的代码然后可以将一些消息发送回邮箱(例如 UpdateStateCompleted
)。
What can you do to avoid blocking the mailbox processor when processing the UpdateState
message? The mailbox processor is (logically) single-threaded - you probably don't want to cancel the processing of UpdateState
message, so the best option is to start processing it in background and wait until the processing completes. The code that processes UpdateState
can then send some message back to the mailbox (e.g. UpdateStateCompleted
).
这是一个草图如何看起来:
Here is a sketch how this might look:
let rec loop (state) = async {
let! msg = mbox.Receive()
match msg with
| GetState(repl) ->
repl.Reply(state)
return! scanning state
| UpdateState ->
async {
// complex calculation (runs in parallel)
mbox.Post(UpdateStateCompleted newState) }
|> Async.Start
| UpdateStateCompleted newState ->
// Received new state from background workflow
return! loop newState }
现在后台任务并行运行,您需要小心可变状态。此外,如果您发送 UpdateState
消息的速度比处理它们快,您会遇到麻烦。这可以是固定的,例如,当您已经处理了上一个请求时,忽略或排队请求。
Now that the background task is running in parallel, you need to be careful about mutable state. Also, if you send UpdateState
messages faster than you can process them, you'll be in trouble. This can be fixed, for example, by ignoring or queueing requests when you're already processing previous one.
这篇关于如何正确使用F#中的TryScan的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!