需要有关 Async 和 fsi 的帮助 [英] Need help regarding Async and fsi
问题描述
我想编写一些代码来运行一系列 F# 脚本 (.fsx).问题是我可以拥有数百个脚本,如果我这样做:
I'd like to write some code that runs a sequence of F# scripts (.fsx). The thing is that I could have literally hundreds of scripts and if I do that:
let shellExecute program args =
let startInfo = new ProcessStartInfo()
do startInfo.FileName <- program
do startInfo.Arguments <- args
do startInfo.UseShellExecute <- true
do startInfo.WindowStyle <- ProcessWindowStyle.Hidden
//do printfn "%s" startInfo.Arguments
let proc = Process.Start(startInfo)
()
scripts
|> Seq.iter (shellExecute "fsi")
它可能给我的 2GB 系统带来太多压力.无论如何,我想按 n 批运行脚本,这似乎也是学习 Async
的一个很好的练习(我想这是要走的路).
it could stress too much my 2GB system. Anyway, I'd like to run scripts by batch of n, which seems also a good exercise for learning Async
(I guess it's the way to go).
我已经开始为此编写一些代码,但不幸的是它不起作用:
I have started to write some code for that but unfortunately it doesn't work:
open System.Diagnostics
let p = shellExecute "fsi" @"C:UsersStringerfoo.fsx"
async {
let! exit = Async.AwaitEvent p.Exited
do printfn "process has exited"
}
|> Async.StartImmediate
foo.fsx 只是一个 hello world 脚本.解决这个问题最惯用的方法是什么?
foo.fsx is just a hello world script. What would be the most idiomatic way of solving this problem?
我还想弄清楚是否可以为每个正在执行的脚本检索返回码,如果不能,请寻找另一种方法.谢谢!
I'd like also to figure out if it's doable to retrieve a return code for each executing script and if not, find another way. Thanks!
非常感谢您的见解和链接!我学到了很多.我只想添加一些代码来使用 Async.Parallel
并行运行批处理,正如 Tomas 建议的那样.如果我的 cut
函数有更好的实现,请发表评论.
Thanks a lot for your insights and links! I've learned a lot.
I just want to add some code for running batchs in parallel using Async.Parallel
as Tomas suggested it. Please comment if there is a better implementation for my cut
function.
module Seq =
/// Returns a sequence of sequences of N elements from the source sequence.
/// If the length of the source sequence is not a multiple
/// of N, last element of the returned sequence will have a length
/// included between 1 and N-1.
let cut (count : int) (source : seq<´T>) =
let rec aux s length = seq {
if (length < count) then yield s
else
yield Seq.take count s
if (length <> count) then
yield! aux (Seq.skip count s) (length - count)
}
aux source (Seq.length source)
let batchCount = 2
let filesPerBatch =
let q = (scripts.Length / batchCount)
q + if scripts.Length % batchCount = 0 then 0 else 1
let batchs =
scripts
|> Seq.cut filesPerBatch
|> Seq.map Seq.toList
|> Seq.map loop
Async.RunSynchronously (Async.Parallel batchs) |> ignore
编辑 2:
所以我在让 Tomas 的守卫代码工作时遇到了一些麻烦.我猜必须在 AddHandler
方法中调用 f
函数,否则我们将永远失去事件......这是代码:
So I had some troubles to get Tomas's guard code working. I guess the f
function had to be called in AddHandler
method, otherwise we loose the event for ever... Here's the code:
module Event =
let guard f (e:IEvent<´Del, ´Args>) =
let e = Event.map id e
{ new IEvent<´Args> with
member this.AddHandler(d) = e.AddHandler(d); f() //must call f here!
member this.RemoveHandler(d) = e.RemoveHandler(d); f()
member this.Subscribe(observer) =
let rm = e.Subscribe(observer) in f(); rm }
有趣的事情(如 Tomas 提到的)是,当进程终止时,Exited
事件似乎存储在某处,即使该进程尚未以 EnableRaisingEvents
设置为真.当这个属性最终设置为 true 时,事件就会被触发.
The interesting thing (as mentioned by Tomas) is that it looks like the Exited
event is stored somewhere when the process terminates, even though the process has not started with EnableRaisingEvents
set to true.
When this property is finally set to true, the event is fired up.
由于我不确定这是官方规范(也有点偏执),我找到了另一种解决方案,即在 guard
函数中启动进程,因此我们确保代码将适用于任何情况:
Since I'm not sure that this is the official specification (and also a bit paranoid), I found another solution that consists in starting the process in the guard
function, so we ensure that the code will work on whichever situation:
let createStartInfo program args =
new ProcessStartInfo
(FileName = program, Arguments = args, UseShellExecute = false,
WindowStyle = ProcessWindowStyle.Normal,
RedirectStandardOutput = true)
let createProcess info =
let p = new Process()
do p.StartInfo <- info
do p.EnableRaisingEvents <- true
p
let rec loop scripts = async {
match scripts with
| [] -> printfn "FINISHED"
| script::scripts ->
let args = sprintf ""%s"" script
let p = createStartInfo "notepad" args |> createProcess
let! exit =
p.Exited
|> Event.guard (fun () -> p.Start() |> ignore)
|> Async.AwaitEvent
let output = p.StandardOutput.ReadToEnd()
do printfn "
PROCESSED: %s, CODE: %d, OUTPUT: %A"script p.ExitCode output
return! loop scripts
}
请注意,我已将 fsi.exe 替换为 notepad.exe,因此我可以在调试器中逐步重放不同的场景并自己明确控制进程的退出.
Notice I've replaced fsi.exe by notepad.exe so I can replay different scenarios step by step in the debugger and control explicitly the exit of the process myself.
推荐答案
我做了一些实验,这里有一种方法可以解决我帖子下方的评论和 Joel 的回答中讨论的问题(我认为这不是目前无法使用,但可以修复).
I did some experiments and here is one way to deal with the problem discussed in the comments below my post and in the answer from Joel (which I think doesn't work currently, but could be fixed).
我认为Process
的规范是在我们设置EnableRaisingEvents
后可以触发Exited
事件属性为 true
(即使在我们设置属性之前进程已经完成,也会触发事件).为了正确处理这种情况,我们需要在将处理程序附加到 Exited
事件之后 启用引发事件.
I think the specification of Process
is that it can trigger the Exited
event after we set the EnableRaisingEvents
property to true
(and will trigger the event even if the process has already completed before we set the property). To handle this case correctly, we need to enable raising of events after we attach handler to the Exited
event.
这是一个问题,因为如果我们使用 AwaitEvent
,它将阻塞工作流,直到事件触发.在从工作流中调用 AwaitEvent
之后,我们不能做任何事情(如果我们在调用 AwaitEvent
之前设置了属性,那么我们会遇到竞争......).Vladimir 的方法是正确的,但是我认为有一种更简单的方法可以解决这个问题.
This is a problme, because if we use AwaitEvent
it will block the workflow until the event fires. We cannot do anything after calling AwaitEvent
from the workflow (and if we set the property before calling AwaitEvent
, then we get a race....). Vladimir's approach is correct, but I think there is a simpler way to deal with this.
我将创建一个函数 Event.guard
接受一个事件并返回一个事件,这允许我们指定一些将在 附加处理程序后执行的函数到活动.这意味着如果我们在这个函数内部做一些操作(反过来触发事件),事件将被处理.
I'll create a function Event.guard
taking an event and returning an event, which allows us to specify some function that will be executed after a handler is attached to the event. This means that if we do some operation (which in turn triggers the event) inside this function, the event will be handled.
为了解决这里讨论的问题,我们需要将我原来的解决方案改成如下.首先,shellExecute
函数不能设置 EnableRaisingEvents
属性(否则,我们可能会丢失事件!).其次,等待代码应该是这样的:
To use it for the problem discussed here, we need to change my original solution as follows. Firstly, the shellExecute
function must not set the EnableRaisingEvents
property (otherwise, we could lose the event!). Secondly, the waiting code should look like this:
let rec loop scripts = async {
match scripts with
| [] -> printf "FINISHED"
| script::scripts ->
let p = shellExecute fsi script
let! exit =
p.Exited
|> Event.guard (fun () -> p.EnableRaisingEvents <- true)
|> Async.AwaitEvent
let output = p.StandardOutput.ReadToEnd()
return! loop scripts }
注意 Event.guard
函数的使用.粗略地说,它表示在工作流将处理程序附加到 p.Exited
事件后,提供的 lambda 函数将运行(并将启用事件引发).但是,我们已经将处理程序附加到事件中,所以如果这立即引发事件,我们就没事!
Note the use of the Event.guard
function. Roughly, it says that after the workflow attaches handler to the p.Exited
event, the provided lambda function will run (and will enable raising of events). However, we already attached the handler to the event, so if this causes the event immediately, we're fine!
实现(对于Event
和Observable
)如下所示:
The implementation (for both Event
and Observable
) looks like this:
module Event =
let guard f (e:IEvent<'Del, 'Args>) =
let e = Event.map id e
{ new IEvent<'Args> with
member x.AddHandler(d) = e.AddHandler(d)
member x.RemoveHandler(d) = e.RemoveHandler(d); f()
member x.Subscribe(observer) =
let rm = e.Subscribe(observer) in f(); rm }
module Observable =
let guard f (e:IObservable<'Args>) =
{ new IObservable<'Args> with
member x.Subscribe(observer) =
let rm = e.Subscribe(observer) in f(); rm }
好消息是这段代码非常简单.
Nice thing is that this code is very straightforward.
这篇关于需要有关 Async 和 fsi 的帮助的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!