PHP popen进程限制? [英] PHP popen process limit?

查看:87
本文介绍了PHP popen进程限制?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将耗时的任务提取到一个单独的进程中.不幸的是,多线程似乎并不是PHP的选择,但是您可以使用popen创建新的php进程.

I'm trying to extract a time consuming task to a separate process. Unfortunately, multi-threading does not really seem to be an option with PHP, but you can create new php processes, using popen.

用例是这样的:有一个cronjob每分钟运行一次,它检查是否有任何电子邮件活动需要发送.可能需要在同一时间发送多个广告系列,但到目前为止,它每分钟只会收到一个广告系列.我想将广告系列的发送提取到一个单独的过程中,以便可以同时发送多个广告系列.

The use case is this: there is a cronjob that runs every minute, which checks if there are any email campaigns that need to be sent. There could be multiple campaigns that need to be sent at the exact same time, but as of now it just picks up one campaign every minute. I would like to extract sending of the campaigns to a separate process, so that I can send multiple campaigns at the same time.

代码看起来像这样(请注意,这只是概念证明):

The code looks something like this (note that this is just a proof of concept):

crontab

* * * * * root /usr/local/bin/php /var/www/maintask.php 2>&1

maintask.php

for ($i = 0; $i < 4; $i++) {
    $processName = "Process_{$i}";
    echo "Spawn process {$processName}" . PHP_EOL;

    $process = popen("php subtask.php?process_name={$processName} 2>&1", "r");
    stream_set_blocking($process, false);
}

subtask.php

$process = $_GET['process_name'];

echo "Started sleeping process {$process}" . PHP_EOL;
sleep(rand(10, 40));
echo "Stopped sleeping process  {$process}" . PHP_EOL;

现在,我遇到的问题是,当我尝试生成4时,popen只会在任何时间生成2个进程.我不知道为什么.似乎没有任何限制记录.也许这受我可用内核数量的限制?

Now, the problem I'm having is that popen will only spawn 2 processes at any time, while I'm trying to spawn 4. I can not figure out why. There doesn't appear to be any limit documented. Perhaps this is limited by the amount of cores I have available?

推荐答案

我修改了subtask.php,因此您可以看到每个任务何时开始,结束以及打算等待多长时间.现在您可以看到进程启动/停止的时间,可以减少睡眠时间-无需使用ps -aux即可显示进程何时运行

I modified subtask.php so you can see when each task starts, ends and how long it is intending to wait. now you can see when a process starts/stops you can reduce the sleep times - don't need to use ps -aux to show when processes are running

subtask.php

<?php
$process = $argv[1];

$sleepTime = rand(1, 10);
echo date('Y-m-d H:i:s') . " - Started sleeping process {$process} ({$sleepTime})" . PHP_EOL;
sleep($sleepTime);
echo date('Y-m-d H:i:s') . " - Stopped sleeping process {$process}" . PHP_EOL;

我已经将该类添加到maintask.php代码中,因此您可以对其进行测试...当您输入的内容比设置的maxProcesses多(尝试32)时,乐趣就开始了(尝试32次)
注意:结果将按完成顺序返回

I've added the Class into the maintask.php code so you can test it... the fun starts when you queue() more entries than you have set maxProcesses (try 32)
NOTE: the results will come back in the order they complete

maintask.php

<?php
class ParallelProcess
{
    private $maxProcesses = 16; // maximum processes
    private $arrProcessQueue = [];
    private $arrCommandQueue = [];

    private function __construct()
    {
    }

    private function __clone()
    {
    }

    /**
     *
     * @return \static
     */
    public static function create()
    {
        $result = new static();
        return $result;
    }

    /**
     *
     * @param int $maxProcesses
     * @return \static
     */
    public static function load($maxProcesses = 16)
    {
        $result = self::create();
        $result->setMaxProcesses($maxProcesses);
        return $result;
    }

    /**
     * get maximum processes
     *
     * @return int
     */
    public function getMaxProcesses()
    {
        return $this->maxProcesses;
    }

    /**
     * set maximum processes
     *
     * @param int $maxProcesses
     * @return $this
     */
    public function setMaxProcesses($maxProcesses)
    {
        $this->maxProcesses = $maxProcesses;
        return $this;
    }

    /**
     * number of entries in the process queue
     *
     * @return int
     */
    public function processQueueLength()
    {
        $result = count($this->arrProcessQueue);
        return $result;
    }

    /**
     * number of entries in the command queue
     *
     * @return int
     */
    public function commandQueueLength()
    {
        $result = count($this->arrCommandQueue);
        return $result;
    }


    /**
     * process open
     *
     * @staticvar array $arrDescriptorspec
     * @param string $strCommand
     * @return $this
     * @throws \Exception
     */
    private function p_open($strCommand)
    {
        static $arrDescriptorSpec = array(
            0 => array('file', '/dev/null', 'r'), // stdin is a file that the child will reda from
            1 => array('pipe', 'w'), // stdout is a pipe that the child will write to
            2 => array('file', '/dev/null', 'w') // stderr is a pipe that the child will write to
        );

        $arrPipes = array();
        if (($resProcess = proc_open($strCommand, $arrDescriptorSpec, $arrPipes)) === false) {
            throw new \Exception("error: proc_open() failed!");
        }

        $resStream = &$arrPipes[1];

        if (($blnSetBlockingResult = stream_set_blocking($resStream, true)) === false) {
            throw new \Exception("error: stream_set_blocking() failed!");
        }

        $this->arrProcessQueue[] = array(&$strCommand, &$resProcess, &$resStream);
        return $this;
    }

    /**
     * execute any queued commands
     *
     * @return $this
     */
    private function executeCommand()
    {
        while ($this->processQueueLength() < $this->maxProcesses and $this->commandQueueLength() > 0) {
            $strCommand = array_shift($this->arrCommandQueue);
            $this->p_open($strCommand);
        }
        return $this;
    }

    /**
     * process close
     *
     * @param array $arrQueueEntry
     * @return $this
     */
    private function p_close(array $arrQueueEntry)
    {
        $resProcess = $arrQueueEntry[1];
        $resStream = $arrQueueEntry[2];

        fclose($resStream);

        $this->returnValue = proc_close($resProcess);

        $this->executeCommand();
        return $this;
    }

    /**
     * queue command
     *
     * @param string $strCommand
     * @return $this
     */
    public function queue($strCommand) {
        // put the command on the $arrCommandQueue
        $this->arrCommandQueue[] = $strCommand;
        $this->executeCommand();
        return $this;
    }

    /**
     * read from stream
     *
     * @param resource $resStream
     * @return string
     */
    private static function readStream($resStream)
    {
        $result = '';
        while (($line = fgets($resStream)) !== false) {
            $result .= $line;
        }
        return $result;
    }

    /**
     * read a result from the process queue
     *
     * @return string|false
     */
    private function readProcessQueue()
    {
        $result = false;
        reset($this->arrProcessQueue);
        while ($result === false && list($key, $arrQueueEntry) = each($this->arrProcessQueue)) {
            $arrStatus = proc_get_status($arrQueueEntry[1]);
            if ($arrStatus['running'] === false) {
                array_splice($this->arrProcessQueue, $key, 1);
                $resStream = $arrQueueEntry[2];
                $result = self::readStream($resStream);
                $this->p_close($arrQueueEntry);
            }
        }
        return $result;
    }

    /**
     * get result from process queue
     *
     * @return string|false
     */
    public function readNext()
    {
        $result = false;
        if ($this->processQueueLength() === 0) {
        } else {
            while ($result === false and $this->processQueueLength() > 0) {
                $result = $this->readProcessQueue();
            }
        }
        return $result;
    }
}

set_time_limit(0); // don't timeout

$objParallelProcess = ParallelProcess::load(8); // allow up to 8 parallel processes

for ($i = 0; $i < 4; $i++) {
    $processName = "Process_{$i}";
    echo date('Y-m-d H:i:s') . " - Queue process {$processName}" . PHP_EOL;
    $objParallelProcess->queue("php subtask.php {$processName}"); // queue process
}

// loop through process queue
while (($strResponse = $objParallelProcess->readNext()) !== false) { // read next result and run next command if one is queued
    // process response
    echo $strResponse;
}

这篇关于PHP popen进程限制?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆