将任务分配给并行工作者,以便预期成本大致相等 [英] Allocating tasks to parallel workers so that expected cost is roughly equal

查看:65
本文介绍了将任务分配给并行工作者,以便预期成本大致相等的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到一个分配问题,在这种情况下,我试图将多个具有已知预期成本(运行时间以秒为单位)的任务分配给X个并行工作程序,但要遵守每个工作程序接收相同数量任务的限制(余数),这样每个工人的预期总运行时间大致相等。



我正在使用一个数据帧,该数据帧定义了要执行的任务,对于每个任务我可以计算出相当准确的预期成本(运行时间以秒为单位)。例如。像这样的东西:

  library( tibble)

set .seed(1232)
任务<-tibble(任务= 1:20,成本= runif(20,最小值= 1,最大值= 5)^ 2)
头(任务)
#> #小动作:6 x 2
#>任务成本
#> < int> < dbl>
#> 1 1 22.5
#> 2 2 20.0
#> 3 3 21.3
#> 4 4 8.13
#> 5 5 18.3
#> 6 6 19.6

由(v0.3.0)




  • rando:随机洗排任务列表

  • alt1:按成本对任务进行排序,并交替分配工人1到X,X到1等。

  • alt2:基于以下



    很难知道发生了什么,因为 rando 方法的标准偏差很大。如果我们只看 alt1 alt2 算法方法:

      sims%>%
    t()%&%;%
    as_tibble( )%>%
    ivot_longer(alt1_std_dev:algos_std_dev,names_to = Method)%>%
    ggplot(aes(x = value,color = Method))+
    geom_density() +
    scale_x_continuous(limits = c(0,5))+
    labs(x =标准差(s))
    警告消息:
    删除了335行,其中包含有限值(stat_density)



    现在是 alt2 algos

      sims%&%;%
    t()%&%;%
    as_tibble()%>%
    ivot_longer(alt2_std_dev :algos_std_dev,names_to = Method)%>%
    ggplot(aes(x = value,color = Method))+
    geom_density()+
    scale_x_continuous(limits = c(0,1.7))+
    labs(x =标准差)



    如您所见, RcppAlgos 解决方案每次都会提供最均衡的负载。



    最后,下面的示例演示了如何关闭每种方法都可以达到目标解决方案:

      summary(abs(t(sims)[, algos_max]-t( sims)[, target_soln]))
    最低。第一区中位数第三区最高
    0.003147 0.057913 0.081986 0.081693 0.106312 0.179099

    摘要(abs(t(sims)[, alt2_max]-t(sims)[, target_soln]))
    最小值。第一区中位数第三区最高
    0.01175 0.14321 0.23916 0.30730 0.40949 2.03156

    摘要(abs(t(sims)[, alt1_max]-t(sims)[, target_soln]))
    最小值。第一区中位数第三区最高
    0.4979 2.9815 4.4725 4.9660 6.3220 16.5716

    摘要(abs(t(sims)[, rando_max]-t(sims)[, target_soln]))
    最小值。第一区中位数第三区最高
    13.16 98.14 143.64 154.10 200.41 427.81

    我们看到 RcppAlgos 解决方案与目标解决方案的平均距离是 3-4 的平均水平是次优方法( alt2 )。



    更新



    在大多数情况下, alt2 / alt1 方法的性能相对较好,非常简单,这是一个巨大的优势。但是,在许多情况下它们将失败。例如,给定的 X 工作人员和 X-1 任务比其他任务花费的时间要长得多任务,因为这些方法依赖于排序,所以可以预见的是,它们将为 X-1 工作人员分配过多的资源。只需在函数 race()中更改以下行:

     # #原始
    任务<-tibble(任务= 1:N_TASKS,cost = runif(N_TASKS,min = 1,max = 10)^ 2)

    ##修改后的
    任务<-tibble(任务= 1:N_TASKS,成本= c(runif(X-1,15,25),
    runif(N_TASKS-X + 1,min = 1,max = 10))^ 2 )

    现在重新运行并观察:

      set.seed(24332)
    sims<-复制(1e3,race())
    sims<-sims%&%;%
    t() %>%
    as_tibble()%&%;%
    ivot_longer(rando:algo,names_to = Method)

    ggplot(sims,aes(x =值,颜色= Method))+
    geom_density()+
    scale_x_continuous(limits = c(0,max(sims $ value))))+
    labs(x =具有较大差距的总运行时间(s ))

      sims%>%
    group_by(Method)%>%
    summary(time_relative_to_ref =平均值(值-ref))%&%;%
    range(time_relative_to_ref)
    #一个小技巧:4 x 2
    方法time_relative_to_ref
    < chr> < dbl>
    1算法0.109
    2 alt2 150.
    3 alt1 184.
    4 rando 839.

    尽管这是一个人为的示例,但它表明由于 alt1 / alt2 解决方案对基础数据进行了假设,因此不可避免地会失败



    * 披露:我是 RcppAlgos


    I have an assignment problem where I'm trying to allocate a number of tasks with a known expected cost (runtime in seconds) to X parallel workers, subject to the constraint that each worker receives the same number of tasks (save for remainders), so that the total expected runtime per worker is roughly equal.

    I'm using a data frame that defines the tasks to be executed, and for each task I can calculate a pretty accurate expected cost (runtime in seconds). E.g. something like this:

    library("tibble")
    
    set.seed(1232)
    tasks <- tibble(task = 1:20, cost = runif(20, min = 1, max = 5)^2)
    head(tasks)
    #> # A tibble: 6 x 2
    #>    task  cost
    #>   <int> <dbl>
    #> 1     1 22.5 
    #> 2     2 20.0 
    #> 3     3 21.3 
    #> 4     4  8.13
    #> 5     5 18.3 
    #> 6     6 19.6
    

    Created on 2019-11-21 by the reprex package (v0.3.0)

    This is then used with foreach::foreach(...) %dopar% ... to execute the tasks in parallel. foreach() splits the tasks into roughly equal sized groups with size nrow(tasks)/X where X is the number of parallel workers (cores).

    I'm currently shuffling the task list so that the cost is roughly equal for each worker, but there can still be substantial deviations, i.e. some workers get finished much earlier than others and thus it would have been better if they had had some more costly tasks. E.g.:

    # shuffle tasks (in the original application cost is not random initially)
    tasks <- tasks[sample(1:nrow(tasks)), ]
    
    # number of workers
    X <- 4
    tasks$worker <- rep(1:X, each = nrow(tasks)/X)
    
    # expected total cost (runtime in s) per worker
    sapply(split(tasks$cost, tasks$worker), sum)
    #>        1        2        3        4 
    #> 77.25278 35.25026 66.09959 64.05435
    

    Created on 2019-11-21 by the reprex package (v0.3.0)

    The second worker finishes in half the time as the other workers, so its capacity is wasted and the thing overall takes longer to finish.

    What I'd like to do instead is have a way of re-ordering the task data frame so that when foreach splits it into X groups the total expected cost per group is more even.

    I imagine this is a super-well known kind of problem and I just don't know the right verbiage to google (nor how to do it in R). Thanks for any help.

    (EDIT) Mostly better alternative

    For now, a relatively simple alternative that seems to do better than random shuffling. This orders the tasks by cost, assigns the first X tasks to workers 1 to X, then assigns the next chunk of X tasks in reverse order to workers X to 1, etc (this is "alt1" below).

    (EDIT2) Added the RcppAlgos method

    By Joseph Wood below.

    library("tibble")
    library("dplyr")
    #> 
    #> Attaching package: 'dplyr'
    #> The following objects are masked from 'package:stats':
    #> 
    #>     filter, lag
    #> The following objects are masked from 'package:base':
    #> 
    #>     intersect, setdiff, setequal, union
    library("ggplot2")
    library("tidyr")
    library("RcppAlgos")
    
    getPartitions <- function(df, nWorkers, tol_ratio = 0.0001) {
    
      nSections <- nrow(df) / nWorkers
      avg <- sum(df$cost) / nWorkers
      tol <- avg * tol_ratio
      vec <- df$cost
      cond <- TRUE
      part <- list()
    
      for (i in 1:(nWorkers - 1)) {
          while (cond) {
              vals <- comboGeneral(vec, nSections,
                                   constraintFun = "sum",
                                   comparisonFun = "==",
                                   limitConstraints = avg + (tol / 2),
                                   tolerance = tol,
                                   upper = 1)
    
              cond <- nrow(vals) == 0
    
              if (cond) {
                  tol <- tol * 2
              } else {
                  v <- match(as.vector(vals), df$cost)
              }
          }
    
          part[[i]] <- v
          vec <- df$cost[-(do.call(c, part))]
          avg <- sum(vec) / (nWorkers - i)
          tol <- avg * tol_ratio
          cond <- TRUE
      }
    
      part[[nWorkers]] <- which(!1:nrow(df) %in% do.call(c, part))
      part
    }
    
    race <- function() {
      N_TASKS = 100
      X = 4
      tasks <- tibble(task = 1:N_TASKS, cost = runif(N_TASKS, min = 1, max = 10)^2)
    
      # random shuffle
      tasks$worker <- rep(1:X, each = nrow(tasks)/X)
      rando <- max(sapply(split(tasks$cost, tasks$worker), sum))
    
      # alternative 1
      tasks <- tasks[order(tasks$cost), ]
      tasks$worker <- rep(c(1:X, X:1), length.out = nrow(tasks))
      alt1 <- max(sapply(split(tasks$cost, tasks$worker), sum))
    
      # modified version of ivan100sic's answer
      # sort by descending cost, after initial allocation, allocate costly tasks
      # first to workers with lowest total cost so far
      group <- factor(rep(1:(ceiling(nrow(tasks)/4)), each = X))
      tasks <- tasks[order(tasks$cost, decreasing = TRUE), ]
      tasks$worker <- c(1:X, rep(NA, length.out = nrow(tasks) - X))
      task_sets <- split(tasks, group)
      task_sets[[1]]$worker <- 1:X
      for (i in 2:length(task_sets)) {
        # get current total cost by worker
        total <- task_sets %>% 
          bind_rows() %>%
          filter(!is.na(worker)) %>%
          group_by(worker) %>%
          summarize(cost = sum(cost)) %>%
          arrange(cost)
        task_sets[[i]]$worker <- total[["worker"]]
      }
      tasks <- bind_rows(task_sets)
      alt2  <- max(sapply(split(tasks$cost, tasks$worker), sum))
    
      # RcppAlogs by Joseph Wood below
      setParts <- getPartitions(tasks, X)
      worker   <- rep(1:4, each = N_TASKS/X)
      row_num  <- unsplit(setParts, worker)
      tasks$worker <- worker[order(row_num)]
      algo <- max(sapply(split(tasks$cost, tasks$worker), sum))
    
    
      c(ref = sum(tasks$cost) / X, rando = rando, alt1 = alt1, alt2 = alt2, algo = algo)
    }
    
    set.seed(24332)
    sims <- replicate(1e3, race())
    sims <- sims %>%
      t() %>%
      as_tibble() %>%
      pivot_longer(rando:algo, names_to = "Method")
    
    ggplot(sims, aes(x = value, color = Method)) + 
      geom_density() +
      scale_x_continuous(limits = c(0, max(sims$value))) +
      labs(x = "Total runtime (s)")
    

    
    # this shows the estimated runtime relative to average total cost
    # per worker (which may be unobtainable)
    sims %>%
      group_by(Method) %>%
      summarize(time_relative_to_ref = mean(value - ref)) %>%
      arrange(time_relative_to_ref)
    #> # A tibble: 4 x 2
    #>   Method time_relative_to_ref
    #>   <chr>                 <dbl>
    #> 1 algo                 0.0817
    #> 2 alt2                 0.307 
    #> 3 alt1                 4.97  
    #> 4 rando              154.
    

    Created on 2020-02-04 by the reprex package (v0.3.0)

    • "rando": randomly shuffle the task list
    • "alt1": sort tasks by cost and alternate assigning to worker 1 to X, X to 1, etc.
    • "alt2": based on ivan100sic's answer below, after the first allocation to workers 1 to X, allocate based on total cost per worker so far
    • "algo": based on Joseph Woods's answer below

    解决方案

    As @JohnColeman points out, this essentially boils down to partitioning. We are trying to partition the tasks equally such that the sum of the cost doesn't vary wildly.

    The algorithm below does just that. The main idea is to successively find a set of tasks whose sum is close to the average. Once we find one, we remove them, and continue selecting.

    The work horse of the algorithm below is comboGeneral from RcppAlgos*. This function allows one to find combinations of a vector meeting a constraint. In this case, we are looking for 5 numbers whose sum is close to sum(tasks$cost) / (number of workers) ~ 60.66425. Since we are looking for numbers close to and not exact, we can bound our constraint. That is, we can look for combinations such that the sum is within a given tolerance.

    library(RcppAlgos)
    
    getPartitions <- function(df, nWorkers, tol_ratio = 0.0001) {
    
        nSections <- nrow(df) / nWorkers
        avg <- sum(df$cost) / nWorkers
        tol <- avg * tol_ratio
        vec <- df$cost
        cond <- TRUE
        part <- list()
    
        for (i in 1:(nWorkers - 1)) {
            while (cond) {
                vals <- comboGeneral(vec, nSections,
                                     constraintFun = "sum",
                                     comparisonFun = "==",
                                     limitConstraints = avg + (tol / 2),
                                     tolerance = tol,
                                     upper = 1)
    
                cond <- nrow(vals) == 0
    
                if (cond) {
                    tol <- tol * 2
                } else {
                    v <- match(as.vector(vals), df$cost)
                }
            }
    
            part[[i]] <- v
            vec <- df$cost[-(do.call(c, part))]
            avg <- sum(vec) / (nWorkers - i)
            tol <- avg * tol_ratio
            cond <- TRUE
        }
    
        part[[nWorkers]] <- which(!1:nrow(df) %in% do.call(c, part))
        part
    }
    

    The output for the example given by the OP is as follows:

    getPartitions(tasks, 4)
    [[1]]
    [1] 11 13  8 14 10
    
    [[2]]
    [1] 12  4 20  2 16
    
    [[3]]
    [1] 19  9 18  5  6
    
    [[4]]
    [1]  1  3  7 15 17
    

    These are the rows from tasks that are to be passed to each worker. It runs instantly and returns a pretty even workload. Here are the estimated times for each worker:

    sapply(getPartitions(tasks, 4), function(x) {
        sum(tasks$cost[x])
    })
    [1] 60.67292 60.66552 60.80399 60.51455
    

    This is pretty good given that the ideal time would be mean(tasks$cost) * 5 ~= 60.66425.

    Let's see how it performs. Below is a modified script for plotting that takes into account how varied each result is for a given method. We measure this with sd (standard deviation). It also returns the ideal solution for reference.

    library("tibble")
    library("dplyr")
    #> 
    #> Attaching package: 'dplyr'
    #> The following objects are masked from 'package:stats':
    #> 
    #>     filter, lag
    #> The following objects are masked from 'package:base':
    #> 
    #>     intersect, setdiff, setequal, union
    library("ggplot2")
    library("tidyr")
    
    race <- function() {
        N_TASKS = 100
        X = 4
        tasks <- tibble(task = 1:N_TASKS, cost = runif(N_TASKS, min = 1, max = 10)^2)
        ideal_soln <- sum(tasks$cost) / X
    
        # random shuffle
        tasks$worker <- rep(1:X, each = nrow(tasks)/X)
        rando_mx <- max(sapply(split(tasks$cost, tasks$worker), sum))
        rando_sd <- sd(sapply(split(tasks$cost, tasks$worker), sum))
    
        # alternative 1
        tasks <- tasks[order(tasks$cost), ]
        tasks$worker <- rep(c(1:X, X:1), length.out = nrow(tasks))
        alt1_mx <- max(sapply(split(tasks$cost, tasks$worker), sum))
        alt1_sd <- sd(sapply(split(tasks$cost, tasks$worker), sum))
    
        # modified version of ivan100sic's answer
        # sort by descending cost, after initial allocation, allocate costly tasks
        # first to workers with lowest total cost so far
        group <- factor(rep(1:(ceiling(nrow(tasks)/4)), each = X))
        tasks <- tasks[order(tasks$cost, decreasing = TRUE), ]
        tasks$worker <- c(1:X, rep(NA, length.out = nrow(tasks) - X))
        task_sets <- split(tasks, group)
        task_sets[[1]]$worker <- 1:X
        for (i in 2:length(task_sets)) {
            # get current total cost by worker
            total <- task_sets %>% 
                bind_rows() %>%
                filter(!is.na(worker)) %>%
                group_by(worker) %>%
                summarize(cost = sum(cost)) %>%
                arrange(cost)
            task_sets[[i]]$worker <- total[["worker"]]
        }
        tasks <- bind_rows(task_sets)
        alt2_mx  <- max(sapply(split(tasks$cost, tasks$worker), sum))
        alt2_sd  <- sd(sapply(split(tasks$cost, tasks$worker), sum))
    
        ## RcppAlgos solution
        setParts <- getPartitions(tasks, X)
        algos_mx <- max(sapply(setParts, function(x) sum(tasks$cost[x])))
        algos_sd <- sd(sapply(setParts, function(x) sum(tasks$cost[x])))
    
        c(target_soln = ideal_soln,rando_max = rando_mx, alt1_max = alt1_mx,
          alt2_max = alt2_mx, algos_max = algos_mx, rando_std_dev = rando_sd,
          alt1_std_dev = alt1_sd, alt2_std_dev = alt2_sd, algos_std_dev = algos_sd)
    }
    
    set.seed(24332)
    system.time(sims <- replicate(1e3, race()))
    sims %>%
        t() %>%
        as_tibble() %>%
        pivot_longer(rando_std_dev:algos_std_dev, names_to = "Method") %>%
        ggplot(aes(x = value, color = Method)) + 
        geom_density() +
        scale_x_continuous(limits = c(0, 100)) +
        labs(x = "Standard Deviation (s)")
    Warning message:
    Removed 719 rows containing non-finite values (stat_density).
    

    It is hard to tell what is going on because the standard deviation for the rando method is so large. If we just look at alt1, alt2, and the algos approach we have:

    sims %>%
        t() %>%
        as_tibble() %>%
        pivot_longer(alt1_std_dev:algos_std_dev, names_to = "Method") %>%
        ggplot(aes(x = value, color = Method)) + 
        geom_density() +
        scale_x_continuous(limits = c(0, 5)) +
        labs(x = "Standard Deviation (s)")
    Warning message:
    Removed 335 rows containing non-finite values (stat_density)
    

    And now alt2 and algos:

    sims %>%
        t() %>%
        as_tibble() %>%
        pivot_longer(alt2_std_dev:algos_std_dev, names_to = "Method") %>%
        ggplot(aes(x = value, color = Method)) + 
        geom_density() +
        scale_x_continuous(limits = c(0, 1.7)) +
        labs(x = "Standard Deviation (s)")
    

    As you can see, the RcppAlgos solution gives the most balanced load every time.

    And finally, here is an illustration that demonstrates how close each method is to the target solution:

    summary(abs(t(sims)[, "algos_max"] - t(sims)[, "target_soln"]))
        Min.  1st Qu.   Median     Mean  3rd Qu.     Max. 
    0.003147 0.057913 0.081986 0.081693 0.106312 0.179099 
    
    summary(abs(t(sims)[, "alt2_max"] - t(sims)[, "target_soln"]))
       Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
    0.01175 0.14321 0.23916 0.30730 0.40949 2.03156
    
    summary(abs(t(sims)[, "alt1_max"] - t(sims)[, "target_soln"]))
      Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
    0.4979  2.9815  4.4725  4.9660  6.3220 16.5716 
    
    summary(abs(t(sims)[, "rando_max"] - t(sims)[, "target_soln"]))
     Min. 1st Qu.  Median    Mean 3rd Qu.    Max. 
    13.16   98.14  143.64  154.10  200.41  427.81
    

    We see that the RcppAlgos solution is around 3-4 times closer on average to the target solution than the second best method (alt2 in this case).

    Update

    For the most part, the alt2/alt1 methods perform relatively well and are very simple, which is a huge plus. However, there are many cases where they will fail. For example, given X workers and X - 1 tasks that you know take a substantially longer time than the other tasks, since those methods rely on sorting, they will predictably allocate too much to X - 1 workers. Simply change the following line in the function race():

    ## Original
    tasks <- tibble(task = 1:N_TASKS, cost = runif(N_TASKS, min = 1, max = 10)^2)
    
    ## Modified
    tasks <- tibble(task = 1:N_TASKS, cost = c(runif(X - 1, 15, 25),
                                               runif(N_TASKS - X + 1, min = 1, max = 10))^2)
    

    Now rerun and observe:

    set.seed(24332)
    sims <- replicate(1e3, race())
    sims <- sims %>%
        t() %>%
        as_tibble() %>%
        pivot_longer(rando:algo, names_to = "Method")
    
    ggplot(sims, aes(x = value, color = Method)) + 
        geom_density() +
        scale_x_continuous(limits = c(0, max(sims$value))) +
        labs(x = "Total runtime with Large Gap (s)")
    

    sims %>%
        group_by(Method) %>%
        summarize(time_relative_to_ref = mean(value - ref)) %>%
        arrange(time_relative_to_ref)
    # A tibble: 4 x 2
    Method time_relative_to_ref
    <chr>                 <dbl>
    1 algo                  0.109
    2 alt2                150.   
    3 alt1                184.   
    4 rando               839.
    

    Although this is a contrived example, it shows that since the alt1/alt2 solutions makes assumptions about the underlying data, it will inevitably fail when presented with a more general problem.

    * Disclosure: I am the author of RcppAlgos

    这篇关于将任务分配给并行工作者,以便预期成本大致相等的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆