使用 F# 代理映射减少 [英] Map Reduce with F# agents

查看:21
本文介绍了使用 F# 代理映射减少的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在玩过 F# 代理后,我尝试使用它们进行 map reduce.

After playing with F# agents I tried to do a map reduce using them.

我使用的基本结构是:

  • map supervisor 将所有工作排入队列并接收来自 map workers 的工作请求
  • reduce supervisor 和 map supervisor 做同样的事情来减少工作
  • 一堆映射和化简的映射和化简worker,如果其中一个工作失败,它会将其发送回相应的主管进行重新处理.

我想知道的问题是:

  • 与使用 PSeq 的更传统(但非常好)的 map reduce(如 (http://tomasp.net/blog/fsharp-parallel-aggregate.aspx))相比,这是否有意义?
  • 我实施地图和减少工人的方式看起来很丑陋,有没有更好的方法?
  • 似乎我可以创建 1000 000 个 map workers 和 1000 0000 reduce workers 哈哈,我应该如何选择这些数字,越多越好?

非常感谢,

type Agent<'T> = MailboxProcessor<'T>

//This is the response the supervisor
//gives to the worker request for work
type 'work SupervisorResponse =
| Work of 'work //a piece of work
| NoWork//no work left to do 

//This is the message to the supervisor
type 'work WorkMsg = 
| ToDo of 'work //piles up work in the Supervisor queue
| WorkReq of   AsyncReplyChannel<SupervisorResponse<'work>> //' 

//The supervisor agent can be interacted with
type AgentOperation = 
| Stop //stop the agent
| Status //yield the current status of supervisor

type 'work SupervisorMsg = 
| WorkRel of 'work WorkMsg
| Operation of AgentOperation 

//Supervises Map and Reduce workers
module AgentSupervisor= 
    let getNew (name:string) = 
        new Agent<SupervisorMsg<'work>>(fun inbox -> //'
            let rec loop state  = async {
                let! msg = inbox.Receive()
                match msg with 
                | WorkRel(m) -> 
                    match m with 
                    | ToDo(work) -> 
                        let newState = work:state
                        return! loop newState
                    | WorkReq(replyChannel) ->  
                        match state with 
                        | [] -> 
                            replyChannel.Reply(NoWork)
                            return! loop []
                        | [item] -> 
                            replyChannel.Reply(Work(item))
                            return! loop []
                        | (item::remaining) -> 
                            replyChannel.Reply(Work(item))
                            return! loop remaining
                | Operation(op) -> 
                    match op with 
                    | Status -> 
                        Console.WriteLine(name+" current Work Queue "+
                                            string (state.Length))
                        return! loop state
                    | Stop -> 
                        Console.WriteLine("Stoppped SuperVisor Agent "+name)
                        return()
            }
            loop [] )
    let stop (agent:Agent<SupervisorMsg<'work>>) = agent.Post(Operation(Stop))
    let status (agent:Agent<SupervisorMsg<'work>>) =agent.Post(Operation(Status))

//Code for the workers
type 'success WorkOutcome = 
| Success of 'success
| Fail

type WorkerMsg = 
| Start
| Stop
| Continue

module AgentWorker = 
    type WorkerSupervisors<'reduce,'work> = 
        { Map:Agent<SupervisorMsg<'work>> ; Reduce:Agent<SupervisorMsg<'reduce>> }

    let stop (agent:Agent<WorkerMsg>) = agent.Post(Stop)
    let start (agent:Agent<WorkerMsg>) = agent.Start()
                                         agent.Post(Start)

    let getNewMapWorker( map, supervisors:WorkerSupervisors<'reduce,'work>  ) = 
        new Agent<WorkerMsg>(fun inbox ->  
            let rec loop ()  = async {
                let! msg = inbox.Receive()
                match msg with 
                | Start -> inbox.Post(Continue)
                           return! loop ()
                | Continue ->   
                    let! supervisorOrder = 
                    supervisors.Map.PostAndAsyncReply(
                        fun replyChannel -> 
                            WorkRel(WorkReq(replyChannel)))
                    match supervisorOrder with 
                    | Work(work) -> 
                        let! res = map work 
                        match res with
                        | Success(toReduce) -> 
                            supervisors.Reduce
                                .Post(WorkRel(ToDo(toReduce)))
                        | Fail -> 
                            Console.WriteLine("Map Fail")
                            supervisors.Map
                                .Post(WorkRel(ToDo(work)))
                            inbox.Post(Continue)
                   | NoWork -> 
                            inbox.Post(Continue)
                            return! loop ()
                | Stop -> 
                    Console.WriteLine("Map worker stopped")
                    return ()
                }
            loop ()  )


    let getNewReduceWorker(reduce,reduceSupervisor:Agent<SupervisorMsg<'work>>)=//'
        new Agent<WorkerMsg>(fun inbox ->  
            let rec loop ()  = async {
                let! msg = inbox.Receive()
                match msg with
                | Start -> inbox.Post(Continue)
                           return! loop()
                | Continue ->   
                    let! supervisorOrder = 
                        reduceSupervisor.PostAndAsyncReply(fun replyChannel -> 
                            WorkRel(WorkReq(replyChannel)))
                    match supervisorOrder with 
                    | Work(work) -> 
                        let! res = reduce work 
                        match res with 
                        | Success(toReduce) -> inbox.Post(Continue)
                        | Fail -> 
                            Console.WriteLine("ReduceFail")
                            reduceSupervisor.Post(WorkRel(ToDo(work)))
                            inbox.Post(Continue)
                    | NoWork -> inbox.Post(Continue)
                    return! loop()
                |Stop ->Console.WriteLine("Reduce worker stopped"); return () 
                }
            loop() )

open AgentWorker

type MapReduce<'work,'reduce>( numberMap:int , 
                               numberReduce: int, 
                               toProcess:'work list,  
                               map:'work->Async<'reduce WorkOutcome>,
                               reduce:'reduce-> Async<unit WorkOutcome>) = 

    let mapSupervisor= AgentSupervisor.getNew("MapSupervisor")  
    let reduceSupervisor  = AgentSupervisor.getNew("ReduceSupervisor")

    let workerSupervisors = {Map = mapSupervisor ; Reduce = reduceSupervisor }

    let mapWorkers = 
        [for i in 1..numberMap -> 
            AgentWorker.getNewMapWorker(map,workerSupervisors) ]
    let reduceWorkers = 
        [for i in 1..numberReduce -> 
            AgentWorker.getNewReduceWorker(reduce,workerSupervisors.Reduce) ] 

    member this.Start() = 
        //Post work to do
        toProcess
        |>List.iter(fun elem -> mapSupervisor.Post( WorkRel(ToDo(elem))))
        //Start supervisors
        mapSupervisor.Start()
        reduceSupervisor.Start()
        //start workers 
        List.iter( fun mapper -> mapper |>start) mapWorkers 
        List.iter( fun reducer ->reducer|>start) reduceWorkers

    member this.Status() =  (mapSupervisor|>AgentSupervisor.status)
                            (reduceSupervisor|>AgentSupervisor.status)
    member this.Stop() = 
        List.map2(fun mapper reducer -> 
            mapper |>stop; reducer|>stop) mapWorkers reduceWorkers

//Run some tests
let map = function (n:int64) -> async{ return Success(n) } 

let reduce = function (toto: int64) -> async{ return Success() }

let mp = MapReduce<int64,int64>( 1,1,[for i in 1L..1000000L->i],map,reduce)

mp.Start()
mp.Status()
mp.Stop()

推荐答案

我喜欢使用 MailboxProcessor 作为算法的 reduce 部分,以及使用 Async.Parallel 调用的 async 块作为映射部分.它让事情变得更加明确,让您可以更好地控制异常处理、超时和取消.

I like to use MailboxProcessor for the reduce part of the algorithm, and async block that's invoked with Async.Parallel for the map part. It makes things more explicit, giving you finer control over exception handling, timeouts, and cancellation.

以下代码是在 Brian 的帮助下设计的,并在他出色的 F# 块的帮助下突出显示了 VS2010 的F# Depth Colorizer"插件.

The following code was designed with Brian's help, and with the help of his excellent F# block highlighting "F# Depth Colorizer" plug-in for VS2010.

此代码旨在以 map-reduce 模式从雅虎天气服务器中提取 RSS 提要.它演示了我们如何从实际算法的外部控制执行流程.

This code is meant to pull RSS feeds from yahoo weather server in a map-reduce pattern. It demonstrates how we can control execution flow from the outside of actual algorithm.

fetchWeather 是 map 部分,mailboxLoop 是算法的 reduce 部分.

fetchWeather is the map part, and mailboxLoop is the reduce part of the algorithm.

#r "System.Xml.Linq.dll"

#r "FSharp.PowerPack.dll"

open System
open System.Diagnostics
open System.IO
open System.Linq
open System.Net
open System.Xml.Linq

open Microsoft.FSharp.Control.WebExtensions 

type Weather (city, region, temperature) = class
   member x.City = city
   member x.Region = region
   member x.Temperature : int = temperature

   override this.ToString() =
      sprintf "%s, %s: %d F" this.City this.Region this.Temperature
end

type MessageForActor = 
   | ProcessWeather of Weather
   | ProcessError of int
   | GetResults of (Weather * Weather * Weather list) AsyncReplyChannel

let parseRss woeid (rssStream : Stream) =
   let xn str = XName.Get str
   let yweather elementName = XName.Get(elementName, "http://xml.weather.yahoo.com/ns/rss/1.0")

   let channel = (XDocument.Load rssStream).Descendants(xn "channel").First()
   let location   = channel.Element(yweather "location")
   let condition  = channel.Element(xn "item").Element(yweather "condition")

   //  If the RSS server returns error, condition XML element won't be available.
   if not(condition = null) then
      let temperature = Int32.Parse(condition.Attribute(xn "temp").Value)
      ProcessWeather(new Weather(
                    location.Attribute(xn "city").Value,
                    location.Attribute(xn "region").Value,
                    temperature))
   else
      ProcessError(woeid)

let fetchWeather (actor : MessageForActor MailboxProcessor) woeid =
   async {
      let rssAddress = sprintf "http://weather.yahooapis.com/forecastrss?w=%d&u=f" woeid
      let webRequest =  WebRequest.Create rssAddress
      use! response = webRequest.AsyncGetResponse()
      use responseStream = response.GetResponseStream()
      let weather = parseRss woeid responseStream
      //do! Async.Sleep 1000 // enable this line to see amplified timing that proves concurrent flow
      actor.Post(weather)
   }

let mailboxLoop initialCount =
   let chooseCityByTemperature op (x : Weather) (y : Weather) =
      if op x.Temperature y.Temperature then x else y

   let sortWeatherByCityAndState (weatherList : Weather list) =
      weatherList
      |> List.sortWith (fun x y -> x.City.CompareTo(y.City))
      |> List.sortWith (fun x y -> x.Region.CompareTo(y.Region))

   MailboxProcessor.Start(fun inbox ->
      let rec loop minAcc maxAcc weatherList remaining =
         async {
            let! message = inbox.Receive()
            let remaining = remaining - 1

            match message with
            | ProcessWeather weather ->
               let colderCity = chooseCityByTemperature (<) minAcc weather
               let warmerCity = chooseCityByTemperature (>) maxAcc weather
               return! loop colderCity warmerCity (weather :: weatherList) remaining
            | ProcessError woeid ->
               let errorWeather = new Weather(sprintf "Error with woeid=%d" woeid, "ZZ", 99999)
               return! loop minAcc maxAcc (errorWeather :: weatherList) remaining
            | GetResults replyChannel ->
               replyChannel.Reply(minAcc, maxAcc, sortWeatherByCityAndState weatherList)
         }

      let minValueInitial = new Weather("", "", Int32.MaxValue)
      let maxValueInitial = new Weather("", "", Int32.MinValue)
      loop minValueInitial maxValueInitial [] initialCount
      )

let RunSynchronouslyWithExceptionAndTimeoutHandlers computation =
   let timeout = 30000
   try
      Async.RunSynchronously(Async.Catch(computation), timeout)
      |> function Choice1Of2 answer               -> answer |> ignore
                | Choice2Of2 (except : Exception) -> printfn "%s" except.Message; printfn "%s" except.StackTrace; exit -4
   with
   | :? System.TimeoutException -> printfn "Timed out waiting for results for %d seconds!" <| timeout / 1000; exit -5

let main =
   // Should have script name, sync/async select, and at least one woeid
   if fsi.CommandLineArgs.Length < 3 then
      printfn "Expecting at least two arguments!"
      printfn "There were %d arguments" (fsi.CommandLineArgs.Length - 1)
      exit -1

   let woeids =
      try
         fsi.CommandLineArgs
         |> Seq.skip 2 // skip the script name and sync/async select
         |> Seq.map Int32.Parse
         |> Seq.toList
      with
      | except -> printfn "One of supplied arguments was not an integer: %s" except.Message; exit -2

   let actor = mailboxLoop woeids.Length

   let processWeatherItemsConcurrently woeids =
      woeids
      |> Seq.map (fetchWeather actor)
      |> Async.Parallel
      |> RunSynchronouslyWithExceptionAndTimeoutHandlers

   let processOneWeatherItem woeid =
      woeid
      |> fetchWeather actor
      |> RunSynchronouslyWithExceptionAndTimeoutHandlers

   let stopWatch = new Stopwatch()
   stopWatch.Start()
   match fsi.CommandLineArgs.[1].ToUpper() with
   | "C" -> printfn "Concurrent execution:  "; processWeatherItemsConcurrently woeids
   | "S" -> printfn "Synchronous execution: "; woeids |> Seq.iter processOneWeatherItem
   | _   -> printfn "Unexpected run options!"; exit -3

   let (min, max, weatherList) = actor.PostAndReply GetResults
   stopWatch.Stop()
   assert (weatherList.Length = woeids.Length)

   printfn "{"
   weatherList |> List.iter (printfn "   %O")
   printfn "}"
   printfn "Coldest place: %O" min
   printfn "Hottest place: %O" max
   printfn "Completed in %d millisec" stopWatch.ElapsedMilliseconds

main

这篇关于使用 F# 代理映射减少的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆