需要有关 Async 和 fsi 的帮助 [英] Need help regarding Async and fsi

查看:23
本文介绍了需要有关 Async 和 fsi 的帮助的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想编写一些代码来运行一系列 F# 脚本 (.fsx).问题是我可以拥有数百个脚本,如果我这样做:

I'd like to write some code that runs a sequence of F# scripts (.fsx). The thing is that I could have literally hundreds of scripts and if I do that:

let shellExecute program args =
    let startInfo = new ProcessStartInfo()
    do startInfo.FileName        <- program
    do startInfo.Arguments       <- args
    do startInfo.UseShellExecute <- true
    do startInfo.WindowStyle     <- ProcessWindowStyle.Hidden

    //do printfn "%s" startInfo.Arguments 
    let proc = Process.Start(startInfo)
    ()

scripts
|> Seq.iter (shellExecute "fsi")

可能给我的 2GB 系统带来太多压力.无论如何,我想按 n 批运行脚本,这似乎也是学习 Async 的一个很好的练习(我想这是要走的路).

it could stress too much my 2GB system. Anyway, I'd like to run scripts by batch of n, which seems also a good exercise for learning Async (I guess it's the way to go).

我已经开始为此编写一些代码,但不幸的是它不起作用:

I have started to write some code for that but unfortunately it doesn't work:

open System.Diagnostics

let p = shellExecute "fsi" @"C:UsersStringerfoo.fsx"

async {
    let! exit = Async.AwaitEvent p.Exited
    do printfn "process has exited"
}
|> Async.StartImmediate

foo.fsx 只是一个 hello world 脚本.解决这个问题最惯用的方法是什么?

foo.fsx is just a hello world script. What would be the most idiomatic way of solving this problem?

我还想弄清楚是否可以为每个正在执行的脚本检索返回码,如果不能,请寻找另一种方法.谢谢!

I'd like also to figure out if it's doable to retrieve a return code for each executing script and if not, find another way. Thanks!

非常感谢您的见解和链接!我学到了很多.我只想添加一些代码来使用 Async.Parallel 并行运行批处理,正如 Tomas 建议的那样.如果我的 cut 函数有更好的实现,请发表评论.

Thanks a lot for your insights and links! I've learned a lot. I just want to add some code for running batchs in parallel using Async.Parallel as Tomas suggested it. Please comment if there is a better implementation for my cut function.

module Seq =
  /// Returns a sequence of sequences of N elements from the source sequence.
  /// If the length of the source sequence is not a multiple
  /// of N, last element of the returned sequence will have a length
  /// included between 1 and N-1.
  let cut (count : int) (source : seq<´T>) = 
    let rec aux s length = seq {
      if (length < count) then yield s
      else
        yield Seq.take count s
        if (length <> count) then
          yield! aux (Seq.skip count s) (length - count)
      }
    aux source (Seq.length source)

let batchCount = 2
let filesPerBatch =
  let q = (scripts.Length / batchCount)
  q + if scripts.Length % batchCount = 0 then 0 else 1

let batchs =
  scripts
  |> Seq.cut filesPerBatch
  |> Seq.map Seq.toList
  |> Seq.map loop

Async.RunSynchronously (Async.Parallel batchs) |> ignore

编辑 2:

所以我在让 Tomas 的守卫代码工作时遇到了一些麻烦.我猜必须在 AddHandler 方法中调用 f 函数,否则我们将永远失去事件......这是代码:

So I had some troubles to get Tomas's guard code working. I guess the f function had to be called in AddHandler method, otherwise we loose the event for ever... Here's the code:

module Event =
  let guard f (e:IEvent<´Del, ´Args>) = 
    let e = Event.map id e
    { new IEvent<´Args> with 
        member this.AddHandler(d) = e.AddHandler(d); f() //must call f here!
        member this.RemoveHandler(d) = e.RemoveHandler(d); f()
        member this.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

有趣的事情(如 Tomas 提到的)是,当进程终止时,Exited 事件似乎存储在某处,即使该进程尚未以 EnableRaisingEvents 设置为真.当这个属性最终设置为 true 时,事件就会被触发.

The interesting thing (as mentioned by Tomas) is that it looks like the Exited event is stored somewhere when the process terminates, even though the process has not started with EnableRaisingEvents set to true. When this property is finally set to true, the event is fired up.

由于我不确定这是官方规范(也有点偏执),我找到了另一种解决方案,即在 guard 函数中启动进程,因此我们确保代码将适用于任何情况:

Since I'm not sure that this is the official specification (and also a bit paranoid), I found another solution that consists in starting the process in the guard function, so we ensure that the code will work on whichever situation:

let createStartInfo program args =
  new ProcessStartInfo
    (FileName = program, Arguments = args, UseShellExecute = false,
     WindowStyle = ProcessWindowStyle.Normal, 
     RedirectStandardOutput = true)

let createProcess info =
  let p = new Process()
  do p.StartInfo           <- info
  do p.EnableRaisingEvents <- true
  p

let rec loop scripts = async { 
  match scripts with 
  | [] -> printfn "FINISHED"
  | script::scripts ->
    let args = sprintf ""%s"" script
    let p = createStartInfo "notepad" args |> createProcess
    let! exit =
      p.Exited 
      |> Event.guard (fun () -> p.Start() |> ignore)
      |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    do printfn "
PROCESSED: %s, CODE: %d, OUTPUT: %A"script p.ExitCode output
    return! loop scripts 
  }

请注意,我已将 fsi.exe 替换为 notepad.exe,因此我可以在调试器中逐步重放不同的场景并自己明确控制进程的退出.

Notice I've replaced fsi.exe by notepad.exe so I can replay different scenarios step by step in the debugger and control explicitly the exit of the process myself.

推荐答案

我做了一些实验,这里有一种方法可以解决我帖子下方的评论和 Joel 的回答中讨论的问题(我认为这不是目前无法使用,但可以修复).

I did some experiments and here is one way to deal with the problem discussed in the comments below my post and in the answer from Joel (which I think doesn't work currently, but could be fixed).

认为Process的规范是在我们设置EnableRaisingEvents后可以触发Exited事件属性为 true (即使在我们设置属性之前进程已经完成,也会触发事件).为了正确处理这种情况,我们需要在将处理程序附加到 Exited 事件之后 启用引发事件.

I think the specification of Process is that it can trigger the Exited event after we set the EnableRaisingEvents property to true (and will trigger the event even if the process has already completed before we set the property). To handle this case correctly, we need to enable raising of events after we attach handler to the Exited event.

这是一个问题,因为如果我们使用 AwaitEvent,它将阻塞工作流,直到事件触发.在从工作流中调用 AwaitEvent 之后,我们不能做任何事情(如果我们在调用 AwaitEvent 之前设置了属性,那么我们会遇到竞争......).Vladimir 的方法是正确的,但是我认为有一种更简单的方法可以解决这个问题.

This is a problme, because if we use AwaitEvent it will block the workflow until the event fires. We cannot do anything after calling AwaitEvent from the workflow (and if we set the property before calling AwaitEvent, then we get a race....). Vladimir's approach is correct, but I think there is a simpler way to deal with this.

我将创建一个函数 Event.guard 接受一个事件并返回一个事件,这允许我们指定一些将在 附加处理程序后执行的函数到活动.这意味着如果我们在这个函数内部做一些操作(反过来触发事件),事件将被处理.

I'll create a function Event.guard taking an event and returning an event, which allows us to specify some function that will be executed after a handler is attached to the event. This means that if we do some operation (which in turn triggers the event) inside this function, the event will be handled.

为了解决这里讨论的问题,我们需要将我原来的解决方案改成如下.首先,shellExecute 函数不能设置 EnableRaisingEvents 属性(否则,我们可能会丢失事件!).其次,等待代码应该是这样的:

To use it for the problem discussed here, we need to change my original solution as follows. Firstly, the shellExecute function must not set the EnableRaisingEvents property (otherwise, we could lose the event!). Secondly, the waiting code should look like this:

let rec loop scripts = async { 
  match scripts with 
  | [] -> printf "FINISHED"
  | script::scripts ->
    let p = shellExecute fsi script 
    let! exit = 
      p.Exited 
        |> Event.guard (fun () -> p.EnableRaisingEvents <- true)
        |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    return! loop scripts  } 

注意 Event.guard 函数的使用.粗略地说,它表示在工作流将处理程序附加到 p.Exited 事件后,提供的 lambda 函数将运行(并将启用事件引发).但是,我们已经将处理程序附加到事件中,所以如果这立即引发事件,我们就没事!

Note the use of the Event.guard function. Roughly, it says that after the workflow attaches handler to the p.Exited event, the provided lambda function will run (and will enable raising of events). However, we already attached the handler to the event, so if this causes the event immediately, we're fine!

实现(对于EventObservable)如下所示:

The implementation (for both Event and Observable) looks like this:

module Event =
  let guard f (e:IEvent<'Del, 'Args>) = 
    let e = Event.map id e
    { new IEvent<'Args> with 
        member x.AddHandler(d) = e.AddHandler(d)
        member x.RemoveHandler(d) = e.RemoveHandler(d); f()
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

module Observable =
  let guard f (e:IObservable<'Args>) = 
    { new IObservable<'Args> with 
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

好消息是这段代码非常简单.

Nice thing is that this code is very straightforward.

这篇关于需要有关 Async 和 fsi 的帮助的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆