父母尽管收割也不会等待子进程完成 [英] Parent doesn't wait for child processes to finish despite reaping

查看:81
本文介绍了父母尽管收割也不会等待子进程完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我完全知道,有很多文章解释了亲子过程动力学的内部运作方式.我已经遍历了它们,并让我的东西按我希望的那样正常工作.但是有一件事困扰着我,尽管多次尝试我还是听不懂.

I am fully aware that there are tonnes of articles explaining the inner workings of parent-child process dynamics. I have gone through them and got my stuff working as I want it to function, almost. But there is one thing which is bugging me out and I am not able to understand it despite multiple tries.

问题:尽管收割了孩子,但main并不是在等待所有孩子完成工作并过早退出.我相信我确实从子进程中退出了,并且已经在子进程中安装了REAPER-那么在子进程完成之前,主要退出方式是什么?

Problem: Despite reaping the children, main is not waiting for all children to finish and exits prematurely. I believe I did make a proper exit from the child process and I have installed the REAPER in the child process - so how is main exiting before the child finishes?

不在这里寻找解决方案-但是我需要一个新的方向,下周我可以继续努力.到目前为止-我觉得我已经用尽了所有的选择,尝试了很多事情,但无济于事.

Not looking for a solution here - but I need a new direction where I could bang my head for the next week. As of now - I feel I have exhausted my options and tried a lot many things but to no avail.

关于我要实现的目标的一些背景:

总而言之-我希望所有孩子都能完成工作,只有到那时,我才想继续做些事情.每个子进程都产生一堆线程,并且这些线程由该子进程正确地连接,然后继续使用exit(0)退出.

All in all - I want all the children to finish, and only then I want to proceed to do something further. Each child process spawns a bunch of threads and those threads are properly joined by the said child process which then proceeds to make an exit with exit(0).

您可能会在程序中观察到额外的hoopla,但这只是我们的要求,我们必须使用5个API(引擎),但每次只能使用固定的批处理大小,例如每次10个.我为每个引擎启动子进程,并为每个请求启动线程-然后我等待所有线程完成,加入它们,然后子进程退出.直到现在,我才能将下一批请求存储到同一引擎,并且对所有引擎都执行此操作,直到我用完总请求数(例如10000)为止.

The additional hoopla you might observe in the program is nothing but our requirement where we are to hit 5 APIs (engines) but only with a fixed batch size, say 10 for each, at a time. I launch child process for each engine and launch thread for each request - and then I wait for all threads to finish, join them, and only then child process exits. Only now I could deposit the next batch of requests to the same engine, and I do this for all engines till I exhaust my total requests, say 10000.

每个请求可能需要1秒钟到2个小时之间的任何时间-基本上它们是从HTTP API提取的CSV报告.

Each request may take anywhere between 1 second to 2 hours - basically they are CSV reports being fetched from an HTTP API.

我的问题是,当我用完所有请求集后-我无法等待MAIN等待所有子进程完成.这很奇怪,也是我要解决的问题.

My issue is that when I have exhausted my total set of requests - I am not able to wait make the MAIN wait till all child processes have finished. This is weird and is the issue which I am trying to tackle.

有什么想法吗?

我的程序输出:

[compuser@lenovoe470:little-stuff]$  perl 07--20190526-batch-processing-using-threads-with-busy-pool-detection-2.pl 12
26710: STARTING TASKS IN BATCHES
26710: RUNNING batch_engine 1_e1 tasks (1 2)
26710: RUNNING batch_engine 2_e2 tasks (3 4)
26710: RUNNING batch_engine 3_e3 tasks (5 6 7)
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710: BUSY_ENGINE: e3.
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710:26712: TASK_ORCHESTRATOR: >> finished batch_engine (2_e2) tasks (3 4)
26710: PID (26712) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e3.
26710:26713: TASK_ORCHESTRATOR: >> finished batch_engine (3_e3) tasks (5 6 7)
26710:26711: TASK_ORCHESTRATOR: >> finished batch_engine (1_e1) tasks (1 2)
26710: PID (26713) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e1.
26710: PID (26711) has finished with status (0). updating proc hash
26710: RUNNING batch_engine 4_e2 tasks (8 9)
26710: RUNNING batch_engine 5_e3 tasks (10 11 12)
26710: FINISHED TASKS IN BATCHES
[compuser@lenovoe470:little-stuff]$  1:26722: TASK_ORCHESTRATOR: >> finished batch_engine (5_e3) tasks (10 11 12)
1:26721: TASK_ORCHESTRATOR: >> finished batch_engine (4_e2) tasks (8 9)

在上面的输出中:

  • 运行batch_engine意味着我正在运行一批编号的任务.
  • BUSY_ENGINE表示端点/引擎正忙,因为它已经在忙于处理请求的最大批处理大小.我需要等待.
  • finished batch_engine表示子进程已完成对特定引擎/端点的给定请求批次的处理.它退出,并且main检测到当前引擎可用,并且可以将下一批加入队列
  • 如果我们看到最后两行,则表明子进程的输出已经溢出了&主体不等待正在奔跑的孩子就过早地退出了.为什么?有帮助吗?

我的程序:

#!/usr/bin/env perl

use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use Thread qw(async);


STDOUT->autoflush(1);


# doesn't work
  sub reaper {
    my $reaped;
    while (($reaped = waitpid (-1,&WNOHANG) > 0)) {
      print "$$: reaped: $reaped\n";
      sleep(1);
    }
    $SIG{CHLD} = \&reaper;
  }
# doesn't work


my @total_tasks = (1 .. shift || 9);
my @engines = (qw/e1 e2 e3/);
my $sizes = { e1 => 2, e2 => 2, e3 => 3, };

my $proc_hash;
my $global_string = "ENGINE";

# source: https://duyanghao.github.io/ways_avoid_zombie_process/
#
  sub REAPER {
    local ($!, $?);
    while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
      if ( WIFEXITED($?) ) 
      {
        # my
        my $ret_code = WEXITSTATUS($?);
        print "$$: PID ($reaped_pid) has finished with status ($ret_code). updating proc hash\n";
        my $engine_name = $proc_hash->{$reaped_pid};
        delete ($proc_hash->{$reaped_pid});
        delete ($proc_hash->{$engine_name});
        # my

        # original
        #my $ret_code = WEXITSTATUS($?);
        #print "child process:$pid exit with code:$ret_code\n";
        # original
      }
    }
  }
#

$SIG{CHLD} = \&REAPER;

sub random_sleep_time {
  return (int(rand(5)+1))
  #return (sprintf "%.2f",(rand(1)+1))
}

sub task_runner {
  my @args = @_;
  my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
  STDOUT->autoflush(1);
  my $task_time = random_sleep_time();
  sleep ($task_time);
  threads->exit(0);
  #print "$$:".(threads->tid()).": TASK_RUNNER: $global_string ($batch_engine) task ($task) finished in $task_time seconds\n";
  #return;
};

sub task_orchestrator {
  my ($batch_engine, @tasks) = @_;
  my $engine = (split (/_/,$batch_engine))[1];
  my $task_orch_pid = fork();
  die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;

  if ($task_orch_pid != 0) {
    $proc_hash->{$engine} = $task_orch_pid;
    $proc_hash->{$task_orch_pid} = $engine;
  }

  if ($task_orch_pid == 0) {
    STDOUT->autoflush(1);
    my @tids;
    for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
    foreach my $task_number (0 .. $#tasks) { 
      $tids [$task_number] = threads->create (
        \&task_runner,[$batch_engine,$tasks [$task_number]]
      );
    }
    my $ppid = getppid();
    foreach my $tid (@tids) {$tid->join()}
    print "$ppid:$$: TASK_ORCHESTRATOR: >> finished batch_engine ($batch_engine) tasks (@tasks)\n";
    exit (0);
  }
}

sub update_proc_hash {
  my $finished_pid = waitpid (-1, POSIX->WNOHANG);
  if ($finished_pid > 0) {
    print "$$: PID ($finished_pid) has finished. updating proc hash\n";
    my $engine_name = $proc_hash->{$finished_pid};
    delete ($proc_hash->{$finished_pid});
    delete ($proc_hash->{$engine_name});
  }
}

my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks) {
  foreach my $engine (@engines) {
    update_proc_hash();
    if (exists $proc_hash->{$engine}) {
      print "$$: BUSY_ENGINE: $engine.\n";
      sleep (1);
      next;
    }
    else {
      my @engine_tasks;
      my $engine_max_tasks = $sizes->{$engine};
      while ($engine_max_tasks-- != 0) {
        my $task = shift @total_tasks;
        push (@engine_tasks,$task) if $task;
      }
      if (@engine_tasks) {
        my $batch_engine = $batch.'_'.$engine;
        print "$$: RUNNING batch_engine $batch_engine tasks (@engine_tasks)\n";
        task_orchestrator ("$batch_engine",@engine_tasks);
        $batch++;
      }
    }
  }
}

REAPER();

print "$$: FINISHED TASKS IN BATCHES\n";

__END__

3天后更新:谢谢SO社区.我再次感谢所有抽出时间来研究此问题并帮助发现和纠正问题的人.非常感谢.

Update after 3 days: Thank you SO community. Once again, I am grateful to all of you who have taken out their time to look into this and helped spot and correct the problem. Thank you so much.

允许我与最终程序共享新输出,以供大家参考.

Allow me to share the new output with the final program for everyone's reference.

使用此修复程序后的输出:

User@Host:/cygdrive/c/bash-home> perl test.pl
22044: STARTING TASKS IN BATCHES
22044: MAIN: engine (e1) is RUNNING batch #1 tasks: (1 2)
22044: MAIN: engine (e2) is RUNNING batch #2 tasks: (3 4 5)
22044: MAIN: engine (e3) is RUNNING batch #3 tasks: (6 7)
41456: TASK_RUNNER: engine (e1) finished batch #1 task #1 in (1.80) seconds
41456: TASK_RUNNER: engine (e1) finished batch #1 task #2 in (1.31) seconds
41456: TASK_ORCHESTRATOR: engine (e1) finished batch #1 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (41456) has finished with status (0).
18252: TASK_RUNNER: engine (e2) finished batch #2 task #3 in (1.04) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #4 in (1.91) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #5 in (1.63) seconds
18252: TASK_ORCHESTRATOR: engine (e2) finished batch #2 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (18252) has finished with status (0).
14544: TASK_RUNNER: engine (e3) finished batch #3 task #6 in (1.42) seconds
14544: TASK_RUNNER: engine (e3) finished batch #3 task #7 in (1.84) seconds
14544: TASK_ORCHESTRATOR: engine (e3) finished batch #3 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (14544) has finished with status (0).
22044: MAIN: engine (e1) is RUNNING batch #4 tasks: (8 9)
22044: MAIN: engine (e2) is RUNNING batch #5 tasks: (10)
37612: TASK_RUNNER: engine (e1) finished batch #4 task #8 in (1.19) seconds
37612: TASK_RUNNER: engine (e1) finished batch #4 task #9 in (1.31) seconds
37612: TASK_ORCHESTRATOR: engine (e1) finished batch #4 tasks in (1.00) seconds.
16300: TASK_RUNNER: engine (e2) finished batch #5 task #10 in (1.53) seconds
16300: TASK_ORCHESTRATOR: engine (e2) finished batch #5 tasks in (1.00) seconds.
22044: ALL ORCHESTRATORS HAVE FINISHED
22044: FINISHED TASKS IN BATCHES

最终工作程序:

#!/usr/bin/env perl

use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use threads;

STDOUT->autoflush(1);

my @total_tasks = (1 .. 10);
my $sleep_time = 1;
my @engines = (qw/e1 e2 e3/);
my $sizes = {
  e1 => 2,
  e2 => 3,
  e3 => 2,
};

my $proc_hash;
my $global_string = "engine";

sub REAPER {
  local ($!, $?);
  while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 ) {
    if ( WIFEXITED($?) ) {
      my $ret_code = WEXITSTATUS($?);
      print "$$: REAPER: TASK_ORCHESTRATOR pid ($reaped_pid) has finished with status ($ret_code).\n";
      my $engine_name = $proc_hash->{$reaped_pid};
      delete ($proc_hash->{$reaped_pid});
      delete ($proc_hash->{$engine_name});
    }
  }
}

$SIG{CHLD} = \&REAPER;

sub random_sleep_time { return sprintf ("%.2f",(rand ($sleep_time||5) + 1)) }

sub task_runner {
  STDOUT->autoflush(1);
  my @args = @_;
  my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
  my ($batch, $engine) = split (/_/,$batch_engine);
  my $task_time = random_sleep_time();
  sleep ($task_time);
  print "$$: TASK_RUNNER: $global_string ($engine) finished batch #$batch task #$task in ($task_time) seconds\n";
  threads->exit(0);
};

sub task_orchestrator {
  my ($batch_engine, @tasks) = @_;
  my ($batch, $engine) = split (/_/,$batch_engine);
  my $task_orch_pid = fork();
  die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;

  if ($task_orch_pid != 0) {
    $proc_hash->{$engine} = $task_orch_pid;
    $proc_hash->{$task_orch_pid} = $engine;
  }

  if ($task_orch_pid == 0) {
    STDOUT->autoflush(1);
    my @tids;
    my $start_time = time;
    for (my $i=1 ; $i <= $#tasks ; $i++) { push (@tids,$i) }
    foreach my $task_number (0 .. $#tasks) {
      $tids [$task_number] = threads->create (
        \&task_runner,[$batch_engine,$tasks [$task_number]]
      );
    }
    foreach my $tid (@tids) {$tid->join()}
    my $end_time = time;
    my $total_time = sprintf ("%.2f",($end_time - $start_time));
    print "$$: TASK_ORCHESTRATOR: engine ($engine) finished batch #$batch tasks in ($total_time) seconds.\n";
    exit (0);
  }
}

my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks)
{
  foreach my $engine (@engines)
  {
    if (exists $proc_hash->{$engine})
    {
      sleep (1);
      next;
    }
    else
    {
      my @engine_tasks;
      my $engine_max_tasks = $sizes->{$engine};
      while ($engine_max_tasks-- != 0)
      {
        my $task = shift @total_tasks;
        push (@engine_tasks,$task) if $task;
      }
      if (@engine_tasks)
      {
        my $batch_engine = $batch.'_'.$engine;
        print "$$: MAIN: engine ($engine) is RUNNING batch #$batch tasks: (@engine_tasks)\n";
        task_orchestrator ($batch_engine,@engine_tasks);
        $batch++;
      }
    }
  }
}

# All 3 below work properly
#sleep (.2) while ((waitpid(-1, WNOHANG)) >= 0);
#sleep (.2) while ((waitpid(-1, WNOHANG)) != -1);
sleep (.2) while ((waitpid(-1, WNOHANG)) > -1);

print "$$: ALL ORCHESTRATORS HAVE FINISHED\n";
print "$$: FINISHED TASKS IN BATCHES\n";
__END__

推荐答案

waitpid

如果存在与PID匹配的子进程但尚未终止的子进程,则可以返回0

can return 0 if there are child processes matching PID but none have terminated yet

对于-1,这适用于任何子进程,因此具有多个子进程的代码肯定会从REAPER中的非阻塞waitpid处获得零收益;只要有未终止的子进程,这就是我们如何等待的方式.但是您的while循环首先从零开始退出.

With -1 this applies to any child process and so your code with multiple children will surely encounter a zero return from the non-blocking waitpid in REAPER; this is precisely how we get to wait as long as there are non-terminated child processes. But your while loop exits at first such zero.

一种解决方法是轮询非负收益

One way about this is to poll for non-negative returns

use warnings;
use strict;
use feature 'say';

use POSIX ':sys_wait_h';
use Time::HiRes qw(sleep) ;

for (1..4) { 
    my $pid = fork // die "Can't fork: $!";
    if ($pid == 0) { 
        sleep rand 4;  
        say "\tkid $$ exiting"; 
        exit;
    };  
}; 

while ( (my $kid = waitpid -1, WNOHANG) > -1 ) { 
    say "got $kid" if $kid > 0;
    sleep 0.2;
}

打印


        kid 12687 exiting
got 12687
        kid 12690 exiting
got 12690
        kid 12689 exiting
got 12689
        kid 12688 exiting
got 12688

请适当调整轮询周期.请注意,由于这会捕获任何个子进程,因此如果那时有任何未等待的分支,它可能会干扰其他派生.

Please adjust the polling period as suitable. Note that since this catches any child processes it is possible for it to interfere with yet other forks, if there were any unwaited ones by that point.

或者您可以通过等待来阻止

Or you can block with the wait

while ( (my $kid = waitpid -1, 0) > -1 ) { 
    say "got $kid";
}

现在您也可以在其中执行> 0,因为自从调用被阻塞以来,这里没有0返回.与以前一样,尽管我们只需要循环终止-1回来(那里没有更多的进程)即可.

where you can now also do > 0, as there'll be no 0 returns here since the call blocks. While we only need the loop to terminate once -1 comes back (no more processes out there), as before.

主要区别在于,该块仅在子进程实际退出后才执行,因此如果您需要密切注意一些长期运行的子进程正在执行的操作(并可能限制其运行时间或防止挂起的工作),即这种形式不那么容易;您想要对此进行非阻塞操作.

The major difference is that the block executes only once a child process actually exited, so if you need to keep tabs on what some long-running children are doing (and perhaps limit their run times or protect against hung jobs) that is not as easy in this form; you want a non-blocking operation for that.

请注意,某些细节(尤其是与退货有关的细节)可能会因系统而异.

Note that some of details, in particular relating to returns, may vary across systems.

此方法的天真的版本是只等待您fork

The naive version of this is to wait only for these specific PIDs, collected as you fork

foreach my $pid (@pids) {
    my $gone = waitpid $pid, 0;
    say "Process $gone exited with $?" if $gone > 0;  # -1 if reaped already
}

每个进程的

均被waitpid阻止.问题是,如果一个进程的运行时间比其他进程(或挂起)长得多,则该循环将被阻塞等待.而且,总的来说,我们宁愿让子进程在退出时获得收益,而不是按照其启动顺序来获得收益.

which blocks with waitpid for each process. The problem with this is that if one process runs much longer than others (or hangs) this loop will be stuck waiting at it. And, just in general, we'd rather have children processes reaped as they exit than in the order in which they were started.

这篇关于父母尽管收割也不会等待子进程完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆