在执行其他操作时非阻塞读取管道 [英] non-blocking read of a pipe while doing other things

查看:72
本文介绍了在执行其他操作时非阻塞读取管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

作为对向大数据写入时进程挂起的后续操作管道,我需要实现一种方法,让父进程从子进程正在写入的管道中读取子进程,同时执行其他操作直到子进程完成.

As a follow-up to process hangs when writing large data to pipe, I need to implement a way for a parent process to read from a pipe being written to by its child process, while doing other things until the child has completed.

更具体地说,父级正在通过HTTP向客户端返回响应.响应由字符串<PING/>组成,在完成ping操作后由字符串<DONE/>组成,其后是实际内容.这样做是为了使连接保持活动状态,直到实际响应准备就绪为止.

More specifically, the parent is returning a response to a client over HTTP. The response consists of the string <PING/>, followed by the string <DONE/> when it is done pinging, followed by the actual content. This is being done to keep the connection alive until the actual response is ready.

1)我主要是在寻找一般反馈.您发现此代码有任何问题吗?

1) I'm mostly just looking for general feedback. Do you see any issues with this code?

2)我会实现非阻塞阅读的目标吗?特别是,一旦读取了所有当前可用的数据(但编写器仍在编写更多数据),我的代码就能从while ( my $line = <$pipe_reader>){继续运行吗?在关闭管道之后,在孩子终止之前,它能正常工作吗?

2) Will I achieve my goal of a non-blocking read? In particular once all of the currently available data has been read (but the writer is still writing more) will my code be able to move on from while ( my $line = <$pipe_reader>){? And will it work properly after the pipe has been closed but before the child terminates?

3)IO::Select的文档说add()带有IO::Handle对象.我到处都看到IO::Handle,但是我不知道如何确定以此方式创建的管道是否算作IO::Handle对象. perl -e "pipe(my $r, my $w); print(ref($r))"只是给我GLOB ...

3) The documentation for IO::Select says that add() takes an IO::Handle object. I keep seeing IO::Handle everywhere, but I don't know how to determine if a pipe created in this way counts as an IO::Handle object. perl -e "pipe(my $r, my $w); print(ref($r))" just gives me GLOB...

4)select的Perl文档(我认为IO::Select基于该文档)警告

4) The Perl documentation for select (on which I assume IO::Select is based) warns

警告:除非POSIX允许,否则不要尝试将缓冲的I/O(如readreadline)与select混合使用,甚至只能在POSIX系统上使用.您必须改用sysread.

WARNING: One should not attempt to mix buffered I/O (like read or readline) with select, except as permitted by POSIX, and even then only on POSIX systems. You have to use sysread instead.

这是否意味着将$writer->write('<PING/>');置于同一循环中是一个问题?

Does this mean it is a problem to have $writer->write('<PING/>'); in the same loop?

pipe(my $pipe_reader, my $pipe_writer);
$pipe_writer->autoflush(1);

my $pid = fork;

if ( $pid ) {

    # parent
    close $pipe_writer;

    $s = IO::Select->new();
    $s->add($pipe_reader);

    my $response  = "";
    my $startTime = time;
    my $interval  = 25;
    my $pings     = 0;

    while ( waitpid(-1, WNOHANG) <= 0 ) {

        if ( time > $startTime + ($interval * $pings) ) {
            $pings++;
            $writer->write('<PING/>');
        }

        if ( $s->can_read(0) ) {

            while ( my $line = <$pipe_reader> ) {
                $response .= $line;
            }
        }
    };

    $writer->write('<DONE/>');
    $writer->write($response);
    close $pipe_reader;
    $writer->close();

else {

    #child
    die "cannot fork: $!" unless defined $pid;
    close $pipe_reader;

    #...do writes here...

    close $pipe_writer;
}

关于$writer,可能与该问题无关,但总体解决方案遵循 此处的第二个代码示例

Regarding $writer, it may be irrelevant to this question, but the overall solution follows the pattern in the second code sample here

由于我们还没有准备好整个HTTP正文,我们将向PSGI返回一个回调,该回调为我们提供了一个$responder对象.我们只给它HTTP状态和内容类型,然后给我们$writer以便稍后编写正文.

Since we aren't ready with the entire HTTP body yet, we return a callback to PSGI which gives us a $responder object. We give it just the HTTP status and content type, then it gives us a $writer to write the body later.

我们在上面的代码中使用$writer来编写ping值和最终的正文.以上所有代码都在返回给PSGI的回调中,但为简洁起见,我省略了该代码.

We use $writer in the above code to write our ping values and the eventual body. All of the above code is in the callback returned to PSGI but I omitted that for brevity.

推荐答案

这里的第一个问题是非阻塞操作的问题.其他问题将在下面解决.

The first issue here is that of the non-blocking operation. Other questions are addressed below.

如您所引用,使用 select (或

As you quote, with select (or IO::Select) one should not use buffered I/O. Specially here where you want non-blocking and non-buffered operation. The code below gets terribly confused with <>.

请注意,缓冲"是多层业务.其中一些可以通过简单的程序指令来打开/关闭,有些则更难弄乱,而某些则取决于实现方式.它包含语言,库,操作系统,硬件.我们要做的至少是使用推荐的工具.

Note that "buffering" is a multi-layered business. Some of it can be turned on/off by a simple program instruction, some is far more difficult to mess with, and some is a matter of implementation. It is in the language, libraries, OS, hardware. The least we can do is to use recommended tools.

因此使用 sysread select操纵的句柄读取,而不是阅读行(什么

Thus read from the select-manipulated handle using sysread, not readline (what <> uses). It returns 0 on EOF so one can test when the writing end has been closed (when EOF gets sent).

use warnings;
use strict;
use feature 'say';

use Time::HiRes qw(sleep);
use IO::Select; 

my $sel = IO::Select->new;

pipe my $rd, my $wr;
$sel->add($rd); 

my $pid = fork // die "Can't fork: $!";  #/

if ($pid == 0) {
    close $rd; 
    $wr->autoflush;
    for (1..4) {
        sleep 1;
        say "\tsending data";
        say $wr 'a' x (120*1024);
    }
    say "\tClosing writer and exiting";
    close $wr;
    exit; 
}   
close $wr;    
say "Forked and will read from $pid";

my @recd;
READ: while (1) {
    if (my @ready = $sel->can_read(0)) {  # beware of signal handlers
        foreach my $handle (@ready) {
            my $buff;
            my $rv = sysread $handle, $buff, 64*1024;
            if (not $rv) {  # error (undef) or closed writer (==0)
                if (not defined $rv) {
                    warn "Error reading: $!";
                }
                last READ;  # single pipe (see text)
            }
            say "Got ", length $buff, " characters";
            push @recd, length $buff; 
        }
    }
    else {
        say "Doing else ... ";
        sleep 0.5; 
    }
}   
close $rd;
my $gone = waitpid $pid, 0;
say "Reaped pid $gone";
say "Have data: @recd"

这假定父级在else中没有进行大量处理,否则会使管道检查等待.在这种情况下,您需要为那些漫长的工作分叉另一个过程.

This assumes that the parent doesn't do a lot of processing in else or that would make the pipe checks wait then. In such a case you need to fork yet another process for those long jobs.

一些评论

  • 我希望从sysread中获取大量数据,因为这是使用数据的最有效方法,并且您希望孩子能写大量数据.您可以从印刷品(下面是一个示例)中看到效果如何.

  • I ask for a lot of data from sysread as that is the most efficient way to use it and as you expect big writes from the child. You can see from the prints (a sample is below) how that works out.

sysread的未定义返回指示错误.该管道可能仍然可读,并且如果我们通过while返回到sysread,我们可能会陷入无限的错误循环,因此我们退出循环.下次可能不会发生读取错误,但指望这样做可能会导致无限循环.

The undefined return of sysread indicates an error. The pipe may still be readable and if we return to sysread via while we may end up in an infinite loop of errors, so we exit the loop. The read error might not happen the next time but counting on that would risk an infinite loop.

在异常返回(写入器关闭或错误)时,代码退出循环,因为此处无需进行其他操作.但是,对于更复杂的IPC(更多的管道,所有这些都在另一个循环中进行新的连接,信号处理程序等),我们需要从要监视的列表中删除该句柄,并且读取错误的处理方式与封闭的作家.

On exceptional return (writer closed or error) the code exits the loop, as no more need be done here. But with more complex IPC (more pipes, all this in another loop taking new connections, signal handlers, etc) we'd need to remove the handle from the list of those to watch for and the handling of read error would differ from that of the closed writer.

在这个简单的示例中,错误处理非常简单(实际上只是last READ if not $rv;).但是总的来说,读取错误与顺序关闭的写入器是另一回事,它们是分开处理的. (例如,在发生读取错误时,我们要重试固定次数.)

In this simple example the error handling is simple (really just last READ if not $rv;). But in general a read error is a different matter from the orderly closed writer and they are handled separately. (For one, on a read error we'd want to retry a fixed number of times.)

可以使用length $buffOFFSET(sysread的第四个参数)将所有数据收集到$buff中.然后,每次写操作都从$buff的末尾开始,然后扩展.

All data can be collected into $buff by using OFFSET, the fourth argument for sysread, of length $buff. Then every write starts at the end of $buff, which gets extended.

my $rv = sysread $handle, $buff, 64*1024, length $buff;

在这种情况下,不需要@recd.这是收集数据的常用方法.

In this case there is no need for @recd. This is a common way to collect data.

信号是任何IPC的组成部分.随后进行了有限的讨论

Signals are a part and parcel of any IPC. A limited discussion follows

安全信号" 通常可以保护我/O被信号中断.但是选择可能会受到影响

"Safe signals" generally protect I/O from being interrupted by a signal. But select may be affected

请注意,select是否在信号(例如SIGALRM)发出后重新启动取决于实现.

Note that whether select gets restarted after signals (say, SIGALRM) is implementation-dependent.

,因此使用它的句柄可能也不安全.以我的经验,当程序处理SIGCHLD时,can_read可以返回(假).这个简单的示例很安全,原因如下:

and thus handles that use it may not be safe either. In my experience can_read can return (false) when a SIGCHLD is handled by the program. This simple example is safe, for a few reasons:

  • 如果在处理信号时can_read返回空,则while会将其带回到该句柄,该句柄仍然可读.

  • If can_read returns empty as a signal is handled the while brings it right back to that handle, which is still readable.

如果在select处阻止了程序,则信号会影响select.但是您可以进行无阻塞操作,并且select检查手柄时信号正确输入的机会很小

A signal can affect select if it comes while the program is blocked at select. But you have non-blocking operation and the chance that the signal comes in right while select is checking handles is minuscule

最后,我不知道写到管道的进程的SIGCHLD是否会影响该管道另一端的select,但是即使赔率在天文数字上很小.

Finally, I don't know whether a SIGCHLD for a process that wrote to a pipe can affect select on the other end of that very pipe, but even if it can odds are astronomically small.

使用更复杂的代码(如果can_read不是直接在上面的循环中),请考虑其错误返回(由于信号)是否会影响程序流程.如果这是一个问题,请添加代码以检查来自can_read的错误返回;如果是由信号引起的,则 $! EINTR.可以使用%!,在使用时会加载 Errno .因此,您可以检查can_read是否由于if $!{EINTR}的中断而返回.例如

With more complex code (if can_read isn't directly in a loop like above) consider whether its faulty return (due to a signal) can affect the program flow. If that is a concern add code to check false returns from can_read; if caused by a signal the $! is EINTR. This can be checked by using %!, which when used gets Errno loaded. So you can check whether can_read returned because of an interrupt by if $!{EINTR}. For instance

if (my @ready = $sel->can_read(0)) {
   ...
}
elsif ($!{EINTR}) { 
   # interrupted by signal, transfer control as suitable
   next READ;
}

同样,上面的程序仍然会立即返回while(假设else块不适合长时间运行的作业,应该有另一个过程).

Again, the program above returns to while promptly anyway (by assumption that the else block isn't meant for long-running jobs, for which there should be another process).

另一个问题是SIGPIPE信号,默认情况下会杀死该程序.由于您要处理管道,因此请谨慎处理 安装信号处理程序

A different matter is SIGPIPE signal, which by default kills the program. Since you are dealing with pipes it is only prudent to handle it, by installing a signal handler

    $SIG{PIPE} = \&handle_sigpipe;

其中,子handle_sigpipe可以执行程序所需的操作.例如,设置用于检查管道有效性的全局变量,因此一旦引发错误,我们就不会尝试再次对其进行读取/写入.我们已分配给$SIG{PIPE}的事实实际上就是保护了该信号.但是,除非它是'IGNORE',否则必须如上所述重新启动can_read.请参阅后续帖子.

where sub handle_sigpipe can do what the program needs. For example, set global variables used to check the validity of the pipe, so once it raised an error we don't try to read/write with it again. The very fact that we have assigned to $SIG{PIPE} protects from that signal. However, unless it's 'IGNORE', the can_read need be restarted as discussed above. See the follow-up post.

对问题的评论

  • 您的代码片段将无法按预期进行"继续浏览",因为它使用<>进行读取. (此外,您在<>的上方进入了while,该位置确实会阻塞.因此,一旦读取可用的内容,它将坐下来,等到更多内容出现.您想要一次读取,但又不要使用<> .)

  • Your code fragment won't be able to "move on" as intended since it uses <> to read. (Besides, you got in a while over <> there, which does block. So once it reads what is available it would sit and wait until more comes. You want a single read instead, but again not with <>.)

每个文件名都是一个IO::Handle(或IO::File)对象,或者至少是按需被祝福加入这些类的.请参阅(第二部分)这篇文章.

Every filenahdle is an IO::Handle (or IO::File) object, or at least gets blessed into those classes on demand. See (second part of) this post.

关于不将缓冲的I/O与select混合的警告与使用它的文件句柄有关.尽管对于管道至关重要,但写入其他服务无关.

The warning on not mixing buffered I/O with select relates to filehandles that use it. While it is crucial for the pipe, writing to that other service is unrelated.

代码注释:无需以孩子的出口为条件进行所有工作.您需要注意孩子何时关闭管道.稍后重新获得该过程(收集信号).

Code comment: there is no need to condition all work on child's exit. You need to watch for when the child closes the pipe. Reap the process (collect the signal) later.

处理类似需求的另一种方法是在自己的fork中完成工作的每个部分.因此,要在一个单独的过程中对HTTP进行保持活动".然后,通过使用 socketpair 进行通信,父进程可以更简单地管理所有子进程.

Another way to handle similar needs is to do each part of work in its own fork. So to do the 'keep-alive' with your HTTP in a yet separate process. Then all child processes can be managed more simply by the parent, by communicating using socketpair.

请参阅这篇文章,以比较readsysread,其中包括许多相关点.

See this post for comparison of read and sysread that includes many relevant points.

上面的代码会打印


Forked and will read from 4171
Doing else ... 
Doing else ... 
Doing else ... 
        sending data
Got 65536 characters
Got 57345 characters
Doing else ... 
Doing else ... 
        sending data
Got 65536 characters
Got 57345 characters
Doing else ... 
Doing else ... 
        sending data
Doing else ... 
Got 65536 characters
Got 40960 characters
Got 16385 characters
Doing else ... 
Doing else ... 
        sending data
Got 65536 characters
Got 24576 characters
        Closing writer and exiting
Got 32769 characters
Doing else ... 
Reaped pid 4171
Have data: 65536 57345 65536 57345 65536 40960 16385 65536 24576 32769

这篇关于在执行其他操作时非阻塞读取管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆