围绕写入到多个输出文件的程序的流包装 [英] Streaming wrapper around program that writes to multiple output files

查看:114
本文介绍了围绕写入到多个输出文件的程序的流包装的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个程序(我不能修改),它创建两个输出文件.我正在尝试编写一个Python包装程序,该程序调用此程序,同时读取两个输出流,合并输出,并打印到stdout(以方便流传输).我该如何避免死锁?下面的以下概念证明可以很好地工作,但是当我将这种方法应用于实际程序时,它将陷入僵局.

There is a program (which I cannot modify) that creates two output files. I am trying to write a Python wrapper that invokes this program, reads both output streams simultaneously, combines the output, and prints to stdout (to facilitate streaming). How can I do this without deadlocking? The following proof of concept below works fine, but when I apply this approach to the actual program it deadlocks.

概念证明:这是一个虚拟程序bogus.py,它创建两个输出文件,例如我要包装的程序.

Proof of concept: this is a dummy program, bogus.py, that creates two output files like the program I'm trying to wrap.

#!/usr/bin/env python
from __future__ import print_function
import sys
with open(sys.argv[1], 'w') as f1, open(sys.argv[2], 'w') as f2:
    for i in range(1000):
        if i % 2 == 0:
            print(i, file=f1)
        else:
            print(i, file=f2)

这是调用该程序并组合其两个输出的Python包装器(一次插入4行).

And here is the Python wrapper that invokes the program and combines its two outputs (interleaving 4 lines from each at a time).

#!/usr/bin/env python
from __future__ import print_function
from contextlib import contextmanager
import os
import shutil
import subprocess
import tempfile

@contextmanager
def named_pipe():
    """
    Create a temporary named pipe.

    Stolen shamelessly from StackOverflow:
    http://stackoverflow.com/a/28840955/459780
    """
    dirname = tempfile.mkdtemp()
    try:
        path = os.path.join(dirname, 'named_pipe')
        os.mkfifo(path)
        yield path
    finally:
        shutil.rmtree(dirname)

with named_pipe() as f1, named_pipe() as f2:
    cmd = ['./bogus.py', f1, f2]
    child = subprocess.Popen(cmd)
    with open(f1, 'r') as in1, open(f2, 'r') as in2:
        buff = list()
        for i, lines in enumerate(zip(in1, in2)):
            line1 = lines[0].strip()
            line2 = lines[1].strip()
            print(line1)
            buff.append(line2)
            if len(buff) == 4:
                for line in buff:
                    print(line)

推荐答案

无论我写的是stdout,stderr还是tty,我看到的都是一个文件的大块,然后是另一个文件的大块.

I'm seeing big chunks of one file and then big chunks of the other file, regardless of whether I write to stdout, stderr, or tty.

如果您不能让孩子对文件使用行缓冲,那么一个简单的解决方案只要输出可用,进程就仍在运行,从输出文件中读取完整的交错行 >是使用线程:

If you can't make the child to use line-buffering for files then a simple solution to read complete interleaved lines from the output files while the process is still running as soon as the output becomes available is to use threads:

#!/usr/bin/env python2
from subprocess import Popen
from threading import Thread
from Queue import Queue

def readlines(path, queue):
    try:
        with open(path) as pipe:
            for line in iter(pipe.readline, ''):
                queue.put(line)
    finally:
        queue.put(None)

with named_pipes(n=2) as paths:
    child = Popen(['python', 'child.py'] + paths)
    queue = Queue()
    for path in paths:
        Thread(target=readlines, args=[path, queue]).start()
    for _ in paths:
        for line in iter(queue.get, None):
            print line.rstrip('\n')

其中 named_pipes(n)是在此处定义的.

where named_pipes(n) is defined here.

pipe.readline()因Python 2上的非阻塞管道而损坏,这就是为什么在此处使用线程的原因.

pipe.readline() is broken for a non-blocking pipe on Python 2 that is why threads are used here.

要打印一个文件中的一行,然后打印另一个文件中的一行:

To print a line from one file followed by a line from another:

with named_pipes(n=2) as paths:
    child = Popen(['python', 'child.py'] + paths)
    queues = [Queue() for _ in paths]
    for path, queue in zip(paths, queues):
        Thread(target=readlines, args=[path, queue]).start()
    while queues:
        for q in queues:
            line = q.get()
            if line is None:  # EOF
                queues.remove(q)
            else:
                print line.rstrip('\n')

如果child.py向一个文件中写入的行比另一文件中写入的行多,则差异将保留在内存中,因此queues中的各个队列可能会无限增长,直到它们填满所有内存.您可以设置队列中的最大项目数,但是必须将超时传递给q.get(),否则代码可能会死锁.

If child.py writes more lines to one file than another file then the difference is kept in memory and therefore individual queues in queues may grow unlimited until they fill all the memory. You can set the max number of items in a queue but then you have to pass a timeout to q.get() otherwise the code may deadlock.

如果您需要从一个输出文件中精确地打印4行,然后从另一输出文件中精确地打印4行,等等,那么您可以稍微修改给定的代码示例:

If you need to print exactly 4 lines from one output file then exactly 4 lines from another output file, etc then you could slightly modify the given code example:

    while queues:
        # print 4 lines from one queue followed by 4 lines from another queue
        for q in queues:
            for _ in range(4):
                line = q.get()
                if line is None:  # EOF
                    queues.remove(q)
                    break
                else:
                    print line.rstrip('\n')

它不会死锁,但是如果您的子进程将过多的数据写入一个文件而没有将足够的数据写入另一个文件,则它可能会消耗所有内存(只有差异被保留在内存中,如果文件相对相等;程序支持)任意大的输出文件).

It won't deadlock but it may eat all memory if your child process writes too much data into one file without writing enough into another file (only the difference is kept in memory—if the files are relatively equal; the program supports arbitrary large output files).

这篇关于围绕写入到多个输出文件的程序的流包装的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆