如何等待子进程在父进程中设置变量? [英] How to wait for child process to set variable in parent process?

查看:102
本文介绍了如何等待子进程在父进程中设置变量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

 use Parallel::ForkManager;    
 my $number_running = 0;
 my $pm = new Parallel::ForkManager(30); 
 $pm->run_on_start( sub { ++$number_running; } );
 $pm->run_on_finish( sub { --$number_running; } );
 for (my $i=0; $i<=100; $i++)
 {
     if ($number_running == 5) { while ($number_running > 0) {} }  # waits forever
     $pm->start and next;
     print $i;
     $pm->finish;
 }

以上代码使用 Parallel :: ForkManager 使用并行进程在for循环中执行代码.它计算正在运行的子进程数,并相应地设置$number_running变量.一旦有5个子进程运行,我希望它等到0个子进程运行后再继续.

for循环中的第一行旨在实现这一点,但它永远在该行上等待.就像子进程对变量所做的更改不适用于该行代码.我究竟做错了什么?注意:我知道wait_all_children,但是我不想使用它.

解决方案

 回调run_on_finish通常不会为每个孩子的退出而触发,因此$number_running不会被减少,因此无法控制循环.解决方法:

  • 使用reap_finished_children以便在每个孩子退出时进行交流,因此run_on_finish实际上在每个孩子退出时都可以运行

  • 使用wait_for_available_procs等待整个批次完成,然后再开始新批次

至于标题,子进程不能在父级中设置任何内容(子进程也不能在父级中设置).他们必须按照本模块中为此目的概述的方式进行交流,以采取一致的行动.


回调run_on_start与每个新进程一起运行,并且计数器递增.但是回调run_on_finish永远不会触发,因此计数器永远不会递减.因此,一旦到达5,代码就会位于while循环中.请注意,父级和子级是独立的进程,因此他们彼此之间都不了解变量,因此无法更改它们.

回调run_on_finish通常是在所有进程分叉之后通过具有wait_all_children来触发的.它的工作也完成了 当最大进程数运行并且一个退出时.这是在start中通过调用wait_one_child(调用on_finish,请参见下文)完成的.

或者,可以通过调用 reap_finished_children 方法来完成此操作

这是一个非阻塞调用,用于获得子代并执行独立于对startwait_all_children的调用的回调.在不经常调用start但您希望快速执行回调的情况下使用此选项.

这解决了主要问题,即如何在个别孩子退出时进行沟通(如注释中所述),而不是wait_all_children.

这里是一个如何使用它的示例,以使回调在子项退出时立即运行.大部分代码仅用于诊断(打印).

use warnings;
use strict;
use feature 'say';
use Parallel::ForkManager;    
$| = 1;

my $total_to_process = 3;  # only a few for this test
my $number_running   = 0;    
my @ds;

my $pm = Parallel::ForkManager->new(30);

$pm->run_on_start( sub {
    ++$number_running;
    say "Started $_[0], total: $number_running";
});
$pm->run_on_finish( sub {
    --$number_running;
    my ($pid, $code, $iden, $sig, $dump, $rdata) = @_;
    push @ds, "gone-$pid";
    say "Cleared $pid, ", ($rdata->[0] // ''), ($code ? " exit $code" : '');
});

foreach my $i (1 .. $total_to_process)
{
    $pm->start and next;
    run_job($i);
    $pm->finish(10*$i, [ "kid #$i" ]);
}
say "Running: ", map { "$_ " } $pm->running_procs;  # pid's of children

# Reap right as each process exits, retrieve and print info
my $curr = $pm->running_procs;
while ($pm->running_procs) 
{
    $pm->reap_finished_children;    # may be fewer now
    if ($pm->running_procs < $curr) {
        $curr = $pm->running_procs;
        say "Remains: $number_running. Data: @ds";
    }
    sleep 1;  # or use Time::HiRes::sleep 0.1;
}

sub run_job {
    my ($num) = @_;
    my $sleep_time = ($num == 1) ? 1 : ($num == 2 ? 10 : 20);
    sleep $sleep_time;
    say "\tKid #$num slept for $sleep_time, exiting";
}

使用此方法等效于在fork之后的循环中调用waitpid -1, POSIX::WNOHANG.此派生少于max(30)进程,以更轻松地查看输出并演示回调在子退出时正确运行.更改这些数字以查看其完整操作.

我们以10*$i退出,以便跟踪输出中的子级.匿名数组[...]中返回的数据是描述性字符串.在回调中,一旦reap_finished_children完成,就会减小$number_running.这就是为什么我们再次使用$curr变量进行诊断的原因.

此打印

start: Started 4656, running: 1
start: Started 4657, running: 2
start: Started 4658, running: 3
Running: 4656 4658 4657 
        Kid #1 slept for 1, exiting
Cleared 4656, kid #1 exit 10
Remains: 2. Data: gone-4656
        Kid #2 slept for 10, exiting
Cleared 4657, kid #2 exit 20
Remains: 1. Data: gone-4656 gone-4657
        Kid #3 slept for 20, exiting
Cleared 4658, kid #3 exit 30
Remains: 0. Data: gone-4656 gone-4657 gone-4658


直接的问题是如何在开始新批次之前等待整个批次完成.可以直接通过 wait_for_available_procs($ n)完成.

等待,直到$n个可用的处理插槽可用为止.如果未提供$n,则默认为 1 .

如果将$MAX用于$n,则只有在整个批次完成后,许多插槽才可用. $n的用法也可以在运行时确定.


一些模块操作的详细信息

当孩子退出时,SIGCHLD信号会发送给父母,为了知道孩子已经离开(必须首先避免僵尸),必须捕捉到该信号.这可以通过在代码中或在SIGCHLD处理程序中(但仅在一个位置)使用waitwaitpid来完成.参见叉子 waitpid 等待.

我们从 P :: FM的源中看到这是在wait_one_child中完成的(通过_waitpid子程序)

sub wait_one_child { my ($s,$par)=@_;  
  my $kid;
  while (1) {
    $kid = $s->_waitpid(-1,$par||=0);
    last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
    redo if !exists $s->{processes}->{$kid};
    my $id = delete $s->{processes}->{$kid};
    $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0);
    last;
  }
  $kid;
};  

wait_all_children

中使用的

sub wait_all_children { my ($s)=@_;
  while (keys %{ $s->{processes} }) {
    $s->on_wait;
    $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
  };
}

上面使用的方法reap_finished_children是该方法的同义词.

start使用信号获取方法wait_one_child来获取最大进程数并退出一个子进程时获得子进程.这就是模块知道何时可以启动另一个进程并尊重其最大数量的方式. (它还被其他一些等待进程的例程使用. ).这就是$s->on_finish( $kid, ... )

触发run_on_finish的时间

sub on_finish {
  my ($s,$pid,@par)=@_;
  my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
  $code->($pid,@par);
};

该回调位于coderef $code中,可从对象的on_finish键中检索该键,该键本身在子run_on_finish中设置.一旦该子程序运行,就可以通过这种方式设置回调.

为此用户可用的方法是wait_all_childrenreap_finished_children.

由于在发布的代码中未使用任何内容,因此$number_running不会得到更新,因此while是无限循环.回想一下,父进程中的变量$number_running不能由子进程直接更改.

 use Parallel::ForkManager;    
 my $number_running = 0;
 my $pm = new Parallel::ForkManager(30); 
 $pm->run_on_start( sub { ++$number_running; } );
 $pm->run_on_finish( sub { --$number_running; } );
 for (my $i=0; $i<=100; $i++)
 {
     if ($number_running == 5) { while ($number_running > 0) {} }  # waits forever
     $pm->start and next;
     print $i;
     $pm->finish;
 }

The above code uses Parallel::ForkManager to execute code in a for loop using parallel processes. It is counting how many child processes are running and setting the $number_running variable accordingly. Once 5 child processes are running, I would like it to wait until 0 child processes are running before continuing.

The first line in the for loop is designed to achieve this but it waits forever on that line. It's like the change to the variable made by the child processes is not available to that line of code. What am I doing wrong? Note: I am aware of wait_all_children but I don't want to use it.

解决方案

Short   The callback run_on_finish normally doesn't get triggered for every child's exit, so $number_running doesn't get reduced and thus it can't control the loop. Ways to fix this:

  • use reap_finished_children in order to communicate as individual children exit, so thatrun_on_finish indeed gets to run as each child exits

  • use wait_for_available_procs to wait for the whole batch to finish before starting a new one

As for the title, a child process cannot set anything in the parent (nor could parent in the child). They must communicate to accord actions, in a way outlined above for this porpose in this module.


The callback run_on_start runs with every new process and the counter is incremented. But the callback run_on_finish is never triggered so the counter is never decremented. Thus once it reaches 5 the code sits in the while loop. Note that a parent and children are separate processes which thus don't know about each other's variables and cannot change them.

The callback run_on_finish is commonly triggered by having wait_all_children after all processes were forked. Its job is also done when maximum number of processes run and one exits. This is done in start by a call to wait_one_child (which calls on_finish, see below).

Or, this can be done at will by calling reap_finished_children method

This is a non-blocking call to reap children and execute callbacks independent of calls tostart or wait_all_children. Use this in scenarios where start is called infrequently but you would like the callbacks executed quickly.

This resolves the main concern of how to communicate as individual children exit (as clarified in comments), and not by wait_all_children.

Here is an example of how to use it so that the callback runs right as a child exits. A good deal of the code is merely for diagnostics (prints).

use warnings;
use strict;
use feature 'say';
use Parallel::ForkManager;    
$| = 1;

my $total_to_process = 3;  # only a few for this test
my $number_running   = 0;    
my @ds;

my $pm = Parallel::ForkManager->new(30);

$pm->run_on_start( sub {
    ++$number_running;
    say "Started $_[0], total: $number_running";
});
$pm->run_on_finish( sub {
    --$number_running;
    my ($pid, $code, $iden, $sig, $dump, $rdata) = @_;
    push @ds, "gone-$pid";
    say "Cleared $pid, ", ($rdata->[0] // ''), ($code ? " exit $code" : '');
});

foreach my $i (1 .. $total_to_process)
{
    $pm->start and next;
    run_job($i);
    $pm->finish(10*$i, [ "kid #$i" ]);
}
say "Running: ", map { "$_ " } $pm->running_procs;  # pid's of children

# Reap right as each process exits, retrieve and print info
my $curr = $pm->running_procs;
while ($pm->running_procs) 
{
    $pm->reap_finished_children;    # may be fewer now
    if ($pm->running_procs < $curr) {
        $curr = $pm->running_procs;
        say "Remains: $number_running. Data: @ds";
    }
    sleep 1;  # or use Time::HiRes::sleep 0.1;
}

sub run_job {
    my ($num) = @_;
    my $sleep_time = ($num == 1) ? 1 : ($num == 2 ? 10 : 20);
    sleep $sleep_time;
    say "\tKid #$num slept for $sleep_time, exiting";
}

Use of this method is equivalent to calling waitpid -1, POSIX::WNOHANG in a loop after fork. This forks fewer than the max (30) processes to see output more easily and demonstrate that the callback runs right as a child exits. Change these numbers to see its full operation.

We exit with 10*$i, so to track children in output. The data returned in an anonymous array [...] is a descriptive string. As soon as reap_finished_children completes the $number_running is reduced, in the callback. This is why we have the $curr variable, again for diagnostics.

This prints

start: Started 4656, running: 1
start: Started 4657, running: 2
start: Started 4658, running: 3
Running: 4656 4658 4657 
        Kid #1 slept for 1, exiting
Cleared 4656, kid #1 exit 10
Remains: 2. Data: gone-4656
        Kid #2 slept for 10, exiting
Cleared 4657, kid #2 exit 20
Remains: 1. Data: gone-4656 gone-4657
        Kid #3 slept for 20, exiting
Cleared 4658, kid #3 exit 30
Remains: 0. Data: gone-4656 gone-4657 gone-4658


The direct question is of how to wait for the whole batch to finish before starting a new one. This can be done directly by wait_for_available_procs($n)

Wait until $n available process slots are available. If $n is not given, defaults to 1.

If $MAX is used for $n, that many slots will become available only once the whole batch completed. What to use for $n can also be decided at runtime.


Some details of module's operation

When a child exits the SIGCHLD signal is sent to the parent, which it must catch in order to know that the child is gone (and to avoid zombies, in the first place). This is done by using wait or waitpid, in code or in the SIGCHLD handler (but only at one place). See fork, Signals in perlipc, waitpid and wait.

We see from P::FM's source that this is done in wait_one_child (via _waitpid sub)

sub wait_one_child { my ($s,$par)=@_;  
  my $kid;
  while (1) {
    $kid = $s->_waitpid(-1,$par||=0);
    last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
    redo if !exists $s->{processes}->{$kid};
    my $id = delete $s->{processes}->{$kid};
    $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0);
    last;
  }
  $kid;
};  

which is used in wait_all_children

sub wait_all_children { my ($s)=@_;
  while (keys %{ $s->{processes} }) {
    $s->on_wait;
    $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
  };
}

The method reap_finished_children used above is a synonym for this method.

The method wait_one_child that gets the signal is used by start to reap child processes when maximum number of processes is filled and one exits. This is how the module knows when it can start another process and respect its maximum. (It is also used by a few other routines that wait for processes. ). And this is when run_on_finish gets triggered, by $s->on_finish( $kid, ... )

sub on_finish {
  my ($s,$pid,@par)=@_;
  my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
  $code->($pid,@par);
};

The callback is in the coderef $code, retrieved from the object's on_finish key, which itself is set in the sub run_on_finish. This is how the callback is set up, once that sub runs.

The methods availed to the user for this are wait_all_children and reap_finished_children.

Since none of this is used in the posted code the $number_running is not getting updated so while is an infinite loop. Recall that the variable $number_running in the parent cannot be directly changed by child processes.

这篇关于如何等待子进程在父进程中设置变量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆