将回调转换为流 [英] Transform callbacks into a stream

查看:94
本文介绍了将回调转换为流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在Perl中,如何将需要回调的函数转换为返回结果流的新函数?

In Perl, how can one transform a function that requires callbacks to a new function that returns a stream of results?

图片我有一个固定的功能,我无法更改:

Image I have a fixed function I can't change:

sub my_fixed_handler {
    my $callback = shift;

    my $count = 1;
    while(1) {
       $callback->($count++);
    }
}

要打印所有数字,我可以轻松编写以下代码:

To print all the a count of numbers I could easily write this code:

my_fixed_handler( sub {
    my $num = shift;
    print "...$num\n";
});

但是现在我需要基于my_fixed_handler的另一个函数,该函数将仅返回一个计算步骤的结果:

But now I need another function based on on the my_fixed_handler that will return only the result of one calculation step:

my $stream = my_wrapper( my_fixer_hander( ... ) ) ;
$stream->next;  # 1
$stream->next;  # 2

这可能吗?

推荐答案

使用管道在已满时阻塞的事实:在派生进程中运行fixed_handler,回调通过管道写回父级.当父进程在读取后​​进行处理时,如果管道已满且编写器正在等待,则管道将被阻塞.为了便于执行此操作,请编写一个额外的空字符串以填充管道.

Use the fact that the pipe blocks when full: Run the fixed_handler in a forked process, with the callback writing back to the parent via a pipe. While the parent is processing after a read the pipe is blocked if full and the writer is waiting. To facilitate this write an extra empty string to fill the pipe.

use warnings;
use strict;
use feature 'say';

sub fixed_handler {
    my $callback = shift;
    #state $count = 1; # would solve the problem
    my $count = 1;
    for (1..4) { $callback->($count++) }   
}

pipe my $reader, my $writer  or die "Can't open pipe: $!";
$writer->autoflush(1);
$reader->autoflush(1);

my $fill_buff = ' ' x 100_000;  # (64_656 - 3); # see text

my $iter = sub { 
    my $data = shift; 
    say "\twrite on pipe ... ($data)";
    say $writer $data;
    say $writer $fill_buff;     # (over)fill the buffer
};

my $pid = fork // die "Can't fork: $!";  #/

if ($pid == 0) {
    close $reader;
    fixed_handler($iter);
    close $writer;
    exit;
}

close $writer;
say "Parent: started kid $pid";

while (my $recd = <$reader>) {
    next if $recd !~ /\S/;      # throw out the filler
    chomp $recd;
    say "got: $recd";
    sleep 1;
}

my $gone = waitpid $pid, 0;
if    ($gone > 0) { say "Child $gone exited with: $?" }
elsif ($gone < 0) { say "No such process: $gone" }

输出


Parent: started kid 13555
        write on pipe ... (1)
got: 1
        write on pipe ... (2)
got: 2
        write on pipe ... (3)
got: 3
        write on pipe ... (4)
got: 4
Child 13555 exited with: 0

起初,作家将继续打印直到填满缓冲区.然后,当读者得到一行时,作家将放另一行(如果打印长度不同,则放两行),依此类推. .如果可以,请删除say $writer $fill_buff;.然后在输出中,我们首先看到所有write on pipe行,然后是父级打印.如今,常见的缓冲区大小为64K.

At first the writer would keep printing until it fills the buffer. Then, as the reader gets one line, the writer puts another (or two, if prints' length vary), etc. If this is OK, remove say $writer $fill_buff;. Then in the output we see all write on pipe lines first, then parent's prints go. A common buffer size nowadays is 64K.

但是,我们被告知file_handler的每个步骤都需要时间,因此在父级中开始处理之前,我们将等待数千个此类步骤(具体取决于每次写入的大小),直到缓冲区填满并每次读取时,作家都会开始受阻.

However, we are told that each step of file_handler takes time and so we'd wait for thousands of such steps before starting the processing in the parent (depending on the size of each write), until the buffer fills and the writer starts getting blocked at each read.

一种解决方法是编写一个额外的字符串,其长度足以填充缓冲区,然后在读取器中将其关闭.我发现它很难做到这一点.首先,在程序中找到的缓冲区是由

One way out of this is to write an extra string, long enough to fill the buffer, and dismiss it in the reader. I found it finicky to get the exact length for this though. For one, the buffer found in the program by

my $cnt; while (1) { ++$cnt; print $writer ' '; print "\r$cnt" } # reader sleeps

与以类似方式在命令行上找到的不同.即使这样,我仍然(有时)会得到双重写入".尽管 that 可能不是表演的制胜法宝,但我还是选择了100K来确保将其填满.

differs from the one found on the command line in a similar way. Even with this I still (sometimes) get "double writes." While that may not be a show stopper, I went with 100K to make sure to fill it.

例如,有关缓冲区大小的讨论,请参见这篇文章.

See this post for discussion of buffer sizes, for example.

另一种方法可能是使用IO::Handle::setvbuf设置管道的缓冲区大小.但是,我遇到了"未在此体系结构上实现"(在生产机器上),因此我不认为这一点.

Another way may be to set the pipe's buffer size using IO::Handle::setvbuf. However, I ran into "Not implemented on this architecture" (on production machines) and so I wouldn't consider that.

缺少缓冲当然会大大降低通信速度.

Messing with buffering will of course slow down the communication a lot.

这实现了 melpomene 的评论中的想法.

This implements the idea from melpomene's comments.

使用缓冲区",我指的是管道的缓冲区(如果在另一侧没有读取数据,则在管道块之前写入的数据量).那是其他涉及到的缓冲区,但在这里并不重要.

With "buffer" I refer to the pipe's buffer (the amount of data written before the pipe blocks, if no data is read on the other side). That are other buffers that get involved but are not as relevant here.

这篇关于将回调转换为流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆