在“并行作业"任务中使用优先级队列的最有效(1循环)方式 [英] Most efficient (1-loop) way to use Priority Queue in Parallel Jobs task

查看:61
本文介绍了在“并行作业"任务中使用优先级队列的最有效(1循环)方式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很难解决与数据结构有关的问题.我最近已经尝试了很多,但是我不知道该如何进行.问题是我的输出正确,但是时间太慢,无法通过自动化测试.

I am having a hard time with a data-structure related problem. I've tried quite a lot recently, but I don't know how to proceed. The problem is that I have the right output, but the timing is too slow and I don't pass the automated tests.

为解决此问题,我正在使用min-heap来实现优先级队列,并为工作人员提供下一个空闲时间-如何提高效率?效率在这里至关重要.

To solve the problem, I am using a min-heap to implement the priority queue with next free times for the workers -- how could I make it more efficient? Efficiency is critical here.

您有一个并行化的程序,并使用m个独立线程来处理n个作业的给定列表.线程按照输入中给定的顺序接受作业.如果有空闲线程,它将立即从列表中获取下一个作业.如果某个线程已开始处理作业,则不会中断或停止,直到完成处理作业.如果多个线程尝试同时从列表中获取作业,则索引较小的线程将接受该作业.对于每个作业,您确切地知道任何线程将花费多长时间来处理该作业,并且所有线程这次的时间是相同的.你需要确定将为每个作业处理哪个线程以及何时开始处理.

You have a program which is parallelized and uses m independent threads to process the given list of n jobs. Threads take jobs in the order they are given in the input. If there is a free thread, it immediately takes the next job from the list. If a thread has started processing a job, it doesn’t interrupt or stop until it finishes processing the job. If several threads try to take jobs from the list simultaneously, the thread with smaller index takes the job. For each job you know exactly how long will it take any thread to process this job, and this time is the same for all the threads. You need to determine for each job which thread will process it and when will it start processing.

输入格式.输入的第一行包含整数m(工人数量)和n(作业数量).第二行包含n个整数-任何线程处理特定作业所花费的时间(以秒为单位).时间以与线程从中接收作业的列表中的顺序相同的顺序给出.

Input Format. The first line of the input contains integers m (amount of workers) and n (amount of jobs). The second line contains n integers — the times in seconds it takes any thread to process a specific job. The times are given in the same order as they are in the list from which threads take jobs.

输出格式.正好输出n行.第i行(使用基于0的索引)应包含两个以空格分隔的整数-将处理第i个作业的线程的基于0的索引,以及开始处理该作业的时间(以秒为单位).*

Output Format. Output exactly n lines. i-th line (0-based index is used) should contain two space- separated integers — the 0-based index of the thread which will process the i-th job and the time in seconds when it will start processing that job.*

来自集合的

from collections import deque
import numpy as np

class solveJobs:
    class Node(dict):

        def __getattr__(self, attr):
            return self.get(attr, None)

    Node.__eq__ = lambda self, other: self.nextfreetime == other.nextfreetime and self.worker == other.worker
    Node.__ne__ = lambda self, other: self.nextfreetime != other.nextfreetime and self.worker != other.worker
    Node.__lt__ = lambda self, other: self.nextfreetime < other.nextfreetime or (self.nextfreetime == other.nextfreetime and np.int(self.worker) < np.int(other.worker))
    Node.__le__ = lambda self, other: self.nextfreetime <= other.nextfreetime
    Node.__gt__ = lambda self, other: self.nextfreetime > other.nextfreetime or (self.nextfreetime == other.nextfreetime and np.int(self.worker) > np.int(other.worker))
    Node.__ge__ = lambda self, other: self.nextfreetime >= other.nextfreetime

    class nextfreetimeQueue:

        def __init__(self, nodes):
            self.size = 0
            self.heap = deque([None])
            self.labeled = False

        def __str__(self):
            return str(list(self.heap)[1:])


        def swap(self, i, j):
            '''
            Swap the values of nodes at index i and j.
            '''
            self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
            #        if self.labeled:
            #            I, J = self.heap[i], self.heap[j]
            #            self.position[I.label] = i
            #            self.position[J.label] = j

        def shift_up(self, i):
            '''
            move upward the value at index i to restore heap property.
            '''
            p = i // 2                  # index of parent node
            while p:
                if self.heap[i] < self.heap[p]:
                    self.swap(i, p)     # swap with parent
                i = p                   # new index after swapping with parent
                p = p // 2              # new parent index

        def shift_down(self, i):
            '''
            move downward the value at index i to restore heap property.
            '''
            c = i * 2
            while c <= self.size:
                c = self.min_child(i)
                if self.heap[i] > self.heap[c] or self.heap[i] == self.heap[c]:
                    self.swap(i, c)
                i = c                   # new index after swapping with child
                c = c * 2               # new child index

        def min_child(self, i):
            '''
            Return index of minimum child node.
            '''
            l, r = (i * 2), (i * 2 + 1)     # indices of left and right child nodes
            if r > self.size:
                return l
            else:
                return l if self.heap[l] < self.heap[r] else r

        @property
        def min(self):
            '''
            Return minimum node in heap.
            '''
            return self.heap[1]



        def insert(self, node):
            '''
            Append `node` to the heap and move up
            if necessary to maintain heap property.
            '''
            #        if has_label(node) and self.labeled:
            #            self.position[node.label] = self.size
            self.heap.append(node)
            self.size += 1
            self.shift_up(self.size)


    def read_data(self):
        self.num_workers, jobcount = map(np.int, input().split()) # first number is the amount of WORKERS, second is the number of jobs
        self.job_durations = list(map(np.int, input().split())) # TAKE INTEGER OVER ALL SPLITS OF INPUT
        self.wq = nextfreetimeQueue([])
        for i in range(self.num_workers):
            self.wq.insert(Node(worker=i+1,nextfreetime=0))
        # assert jobcount == len(self.job_durations)
        self.assigned_workers = [None] * len(self.job_durations) # which thread takes
        self.start_times = [None] * len(self.job_durations) # WHEN A JOB IS STARTED

    def write_response(self):
        for i in range(len(self.job_durations)): # for each job, do:
          print(self.assigned_workers[i]-1, self.start_times[i]) # print the worker and when it starts the JOB I


    def assign_jobs(self):

        for i in range(len(self.job_durations)): # loop over all jobs
          next_worker_node =  self.wq.min # finds the minimum free time dict (worker, nextfreetime)
          # nft = next_worker_node['nextfreetime']
          self.assigned_workers[i] = next_worker_node['worker'] # assign the worker index to the list
          self.start_times[i] = next_worker_node['nextfreetime'] # assign that worker's next free time to job starting time
          self.wq.min['nextfreetime'] += self.job_durations[i] # increase workers next free time
          self.wq.shift_down(1)

    def solve(self):
        self.read_data()
        self.assign_jobs()
        self.write_response()

推荐答案

快速阅读代码后,您会想到一些事情.

A few things come to mind after a quick read through your code.

首先,除非有令人信服的理由编写自己的最小堆,否则您可能应该只使用现有的

First, unless there's some compelling reason to write your own min heap, you probably should just use the existing heapq.

在这种情况下,标准数组可能比 deque 更快.您所有的插入和删除操作都在数组的末尾,因此您不会产生插入中间或前面或从中间或前面删除的O(n)成本.

A standard array will probably be faster than a deque in this case. All of your insertions and removals are at the end of the array, so you don't incur the O(n) costs of inserting into and removing from the middle or the front.

您的 shift_down 代码存在两个问题,一个是次要问题,另一个是主要问题.你有这个循环:

There are two problems, one minor and one major, with your shift_down code. You have this loop:

while c <= self.size:
    c = self.min_child(i)
    if self.heap[i] > self.heap[c] or self.heap[i] == self.heap[c]:
        self.swap(i, c)
    i = c                   # new index after swapping with child
    c = c * 2               # new child index

主要问题是,即使您要插入的项目位于顶部,它也会始终在整个循环中进行 log(n)迭代.如果在任何时候 self.heap [i]<self.heap [c] .

The major problem is that it always does log(n) iterations through the loop, even if the item you're inserting belongs at the top. You can reduce that by exiting the loop if at any time self.heap[i] < self.heap[c].

次要问题是没有充分的理由检查 self.heap [i] == self.heap [c] ,因为这在您的程序中不会发生.如果堆中的两个节点具有相同的值,则意味着您在队列中多次拥有相同的工作线程.

The minor problem is that there's no good reason to check for self.heap[i] == self.heap[c], because that can't happen in your program. If two nodes in the heap have the same value, it would mean that you have the same worker in the queue multiple times.

您可以通过此简单的更改来解决这两个问题:

You can fix both problems with this simple change:

    if self.heap[i] < self.heap[c]
        break         # node is where it belongs. Exit loop
    # otherwise, swap and go around again
    self.swap(i, c)
    i = c
    c = c * 2

执行多次循环可能是您最大的性能问题,尤其是在工作人员数量很大的情况下.

Executing the loop too many times is probably your biggest performance problem, especially if the number of workers is large.

有一种构建原始堆的更快方法.排序列表是有效的最小堆.因此,您可以按顺序使用工作编号来初始化堆.当您可以在O(n)中初始化堆时,无需支付O(n log n)的费用来插入单个工作程序.如果您的工作人员未按排序顺序,您仍然可以在O(n)中初始化堆.请参见>如何建立堆时间为O(n)的时间?,或在heapify(x)函数.rel ="nofollow noreferrer"> heapq源代码.

There's a faster way to build the heap originally. A sorted list is a valid min-heap. So you could just initialize your heap with the worker numbers in order. No need to pay the O(n log n) cost to insert the individual workers, when you can initialize the heap in O(n). If your workers aren't in sorted order, you can still initialize a heap in O(n). See How can building a heap be O(n) time complexity?, or check out the heapify(x) function in the heapq source code.

这篇关于在“并行作业"任务中使用优先级队列的最有效(1循环)方式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆