使用 Python 多处理解决令人尴尬的并行问题 [英] Solving embarassingly parallel problems using Python multiprocessing

查看:27
本文介绍了使用 Python 多处理解决令人尴尬的并行问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用 multiprocessing 来解决 令人尴尬的并行问题?

How does one use multiprocessing to tackle embarrassingly parallel problems?

令人尴尬的并行问题通常由三个基本部分组成:

Embarassingly parallel problems typically consist of three basic parts:

  1. 读取输入数据(来自文件、数据库、tcp 连接等).
  2. 对输入数据运行计算,其中每个计算独立于任何其他计算.
  3. 写入计算结果(到文件、数据库、tcp 连接等).
  1. Read input data (from a file, database, tcp connection, etc.).
  2. Run calculations on the input data, where each calculation is independent of any other calculation.
  3. Write results of calculations (to a file, database, tcp connection, etc.).

我们可以在两个维度上并行化程序:

We can parallelize the program in two dimensions:

  • 第 2 部分可以在多个内核上运行,因为每个计算都是独立的;处理顺序无关紧要.
  • 每个部分都可以独立运行.第 1 部分可以将数据放入输入队列,第 2 部分可以从输入队列中提取数据并将结果放入输出队列,第 3 部分可以将结果从输出队列中提取并写出.

这似乎是并发编程中最基本的模式,但我仍然迷失在尝试解决它的过程中,所以让我们编写一个规范的示例来说明如何使用多处理来实现这一点.

This seems a most basic pattern in concurrent programming, but I am still lost in trying to solve it, so let's write a canonical example to illustrate how this is done using multiprocessing.

这是示例问题:给定一个 CSV 文件,其中输入的是整数行,计算它们的总和.把问题分成三部分,都可以并行运行:

Here is the example problem: Given a CSV file with rows of integers as input, compute their sums. Separate the problem into three parts, which can all run in parallel:

  1. 将输入文件处理为原始数据(整数列表/可迭代对象)
  2. 并行计算数据的总和
  3. 输出总和

下面是解决这三个任务的传统单进程绑定 Python 程序:

Below is traditional, single-process bound Python program which solves these three tasks:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# basicsums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file.
"""

import csv
import optparse
import sys

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "

".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums
"""])
    cli_parser = optparse.OptionParser(usage)
    return cli_parser


def parse_input_csv(csvfile):
    """Parses the input CSV and yields tuples with the index of the row
    as the first element, and the integers of the row as the second
    element.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.reader` instance

    """
    for i, row in enumerate(csvfile):
        row = [int(entry) for entry in row]
        yield i, row


def sum_rows(rows):
    """Yields a tuple with the index of each input list of integers
    as the first element, and the sum of the list of integers as the
    second element.

    The index is zero-index based.

    :Parameters:
    - `rows`: an iterable of tuples, with the index of the original row
      as the first element, and a list of integers as the second element

    """
    for i, row in rows:
        yield i, sum(row)


def write_results(csvfile, results):
    """Writes a series of results to an outfile, where the first column
    is the index of the original row of data, and the second column is
    the result of the calculation.

    The index is zero-index based.

    :Parameters:
    - `csvfile`: a `csv.writer` instance to which to write results
    - `results`: an iterable of tuples, with the index (zero-based) of
      the original row as the first element, and the calculated result
      from that row as the second element

    """
    for result_row in results:
        csvfile.writerow(result_row)


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)
    # gets an iterable of rows that's not yet evaluated
    input_rows = parse_input_csv(in_csvfile)
    # sends the rows iterable to sum_rows() for results iterable, but
    # still not evaluated
    result_rows = sum_rows(input_rows)
    # finally evaluation takes place as a chain in write_results()
    write_results(out_csvfile, result_rows)
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

让我们用这个程序重写它,以使用多处理来并行化上面概述的三个部分.下面是这个新的并行程序的骨架,需要充实以解决注释中的部分:

Let's take this program and rewrite it to use multiprocessing to parallelize the three parts outlined above. Below is a skeleton of this new, parallelized program, that needs to be fleshed out to address the parts in the comments:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# multiproc_sums.py
"""A program that reads integer values from a CSV file and writes out their
sums to another CSV file, using multiple processes if desired.
"""

import csv
import multiprocessing
import optparse
import sys

NUM_PROCS = multiprocessing.cpu_count()

def make_cli_parser():
    """Make the command line interface parser."""
    usage = "

".join(["python %prog INPUT_CSV OUTPUT_CSV",
            __doc__,
            """
ARGUMENTS:
    INPUT_CSV: an input CSV file with rows of numbers
    OUTPUT_CSV: an output file that will contain the sums
"""])
    cli_parser = optparse.OptionParser(usage)
    cli_parser.add_option('-n', '--numprocs', type='int',
            default=NUM_PROCS,
            help="Number of processes to launch [DEFAULT: %default]")
    return cli_parser


def main(argv):
    cli_parser = make_cli_parser()
    opts, args = cli_parser.parse_args(argv)
    if len(args) != 2:
        cli_parser.error("Please provide an input file and output file.")
    infile = open(args[0])
    in_csvfile = csv.reader(infile)
    outfile = open(args[1], 'w')
    out_csvfile = csv.writer(outfile)

    # Parse the input file and add the parsed data to a queue for
    # processing, possibly chunking to decrease communication between
    # processes.

    # Process the parsed data as soon as any (chunks) appear on the
    # queue, using as many processes as allotted by the user
    # (opts.numprocs); place results on a queue for output.
    #
    # Terminate processes when the parser stops putting data in the
    # input queue.

    # Write the results to disk as soon as they appear on the output
    # queue.

    # Ensure all child processes have terminated.

    # Clean up files.
    infile.close()
    outfile.close()


if __name__ == '__main__':
    main(sys.argv[1:])

这几段代码,还有另一段可以生成example的代码用于测试目的的 CSV 文件,可以在 github 上找到.

These pieces of code, as well as another piece of code that can generate example CSV files for testing purposes, can be found on github.

如果您有任何有关并发专家如何解决此问题的见解,我将不胜感激.

I would appreciate any insight here as to how you concurrency gurus would approach this problem.

以下是我在考虑这个问题时遇到的一些问题.解决任何/所有问题的奖励积分:

Here are some questions I had when thinking about this problem. Bonus points for addressing any/all:

  • 我应该让子进程读取数据并将其放入队列,还是主进程可以在不阻塞的情况下执行此操作,直到读取所有输入?
  • 同样,我应该有一个子进程来将结果从已处理队列中写出,还是主进程可以在不必等待所有结果的情况下执行此操作?
  • 我是否应该使用 进程池进行求和运算?
    • Should I have child processes for reading in the data and placing it into the queue, or can the main process do this without blocking until all input is read?
    • Likewise, should I have a child process for writing the results out from the processed queue, or can the main process do this without having to wait for all the results?
    • Should I use a processes pool for the sum operations?
      • If yes, what method do I call on the pool to get it to start processing the results coming into the input queue, without blocking the input and output processes, too? apply_async()? map_async()? imap()? imap_unordered()?

      推荐答案

      我的解决方案有一个额外的功能,以确保输出的顺序与输入的顺序相同.我使用 multiprocessing.queue 在进程之间发送数据,发送停止消息,以便每个进程知道退出检查队列.我认为来源中的评论应该清楚地说明发生了什么,但如果没有让我知道.

      My solution has an extra bell and whistle to make sure that the order of the output has the same as the order of the input. I use multiprocessing.queue's to send data between processes, sending stop messages so each process knows to quit checking the queues. I think the comments in the source should make it clear what's going on but if not let me know.

      #!/usr/bin/env python
      # -*- coding: UTF-8 -*-
      # multiproc_sums.py
      """A program that reads integer values from a CSV file and writes out their
      sums to another CSV file, using multiple processes if desired.
      """
      
      import csv
      import multiprocessing
      import optparse
      import sys
      
      NUM_PROCS = multiprocessing.cpu_count()
      
      def make_cli_parser():
          """Make the command line interface parser."""
          usage = "
      
      ".join(["python %prog INPUT_CSV OUTPUT_CSV",
                  __doc__,
                  """
      ARGUMENTS:
          INPUT_CSV: an input CSV file with rows of numbers
          OUTPUT_CSV: an output file that will contain the sums
      """])
          cli_parser = optparse.OptionParser(usage)
          cli_parser.add_option('-n', '--numprocs', type='int',
                  default=NUM_PROCS,
                  help="Number of processes to launch [DEFAULT: %default]")
          return cli_parser
      
      class CSVWorker(object):
          def __init__(self, numprocs, infile, outfile):
              self.numprocs = numprocs
              self.infile = open(infile)
              self.outfile = outfile
              self.in_csvfile = csv.reader(self.infile)
              self.inq = multiprocessing.Queue()
              self.outq = multiprocessing.Queue()
      
              self.pin = multiprocessing.Process(target=self.parse_input_csv, args=())
              self.pout = multiprocessing.Process(target=self.write_output_csv, args=())
              self.ps = [ multiprocessing.Process(target=self.sum_row, args=())
                              for i in range(self.numprocs)]
      
              self.pin.start()
              self.pout.start()
              for p in self.ps:
                  p.start()
      
              self.pin.join()
              i = 0
              for p in self.ps:
                  p.join()
                  print "Done", i
                  i += 1
      
              self.pout.join()
              self.infile.close()
      
          def parse_input_csv(self):
                  """Parses the input CSV and yields tuples with the index of the row
                  as the first element, and the integers of the row as the second
                  element.
      
                  The index is zero-index based.
      
                  The data is then sent over inqueue for the workers to do their
                  thing.  At the end the input process sends a 'STOP' message for each
                  worker.
                  """
                  for i, row in enumerate(self.in_csvfile):
                      row = [ int(entry) for entry in row ]
                      self.inq.put( (i, row) )
      
                  for i in range(self.numprocs):
                      self.inq.put("STOP")
      
          def sum_row(self):
              """
              Workers. Consume inq and produce answers on outq
              """
              tot = 0
              for i, row in iter(self.inq.get, "STOP"):
                      self.outq.put( (i, sum(row)) )
              self.outq.put("STOP")
      
          def write_output_csv(self):
              """
              Open outgoing csv file then start reading outq for answers
              Since I chose to make sure output was synchronized to the input there
              is some extra goodies to do that.
      
              Obviously your input has the original row number so this is not
              required.
              """
              cur = 0
              stop = 0
              buffer = {}
              # For some reason csv.writer works badly across processes so open/close
              # and use it all in the same process or else you'll have the last
              # several rows missing
              outfile = open(self.outfile, "w")
              self.out_csvfile = csv.writer(outfile)
      
              #Keep running until we see numprocs STOP messages
              for works in range(self.numprocs):
                  for i, val in iter(self.outq.get, "STOP"):
                      # verify rows are in order, if not save in buffer
                      if i != cur:
                          buffer[i] = val
                      else:
                          #if yes are write it out and make sure no waiting rows exist
                          self.out_csvfile.writerow( [i, val] )
                          cur += 1
                          while cur in buffer:
                              self.out_csvfile.writerow([ cur, buffer[cur] ])
                              del buffer[cur]
                              cur += 1
      
              outfile.close()
      
      def main(argv):
          cli_parser = make_cli_parser()
          opts, args = cli_parser.parse_args(argv)
          if len(args) != 2:
              cli_parser.error("Please provide an input file and output file.")
      
          c = CSVWorker(opts.numprocs, args[0], args[1])
      
      if __name__ == '__main__':
          main(sys.argv[1:])
      

      这篇关于使用 Python 多处理解决令人尴尬的并行问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆