PHP pThreads - 你如何执行垃圾收集? [英] PHP pThreads - How do you perform garbage collection?

查看:89
本文介绍了PHP pThreads - 你如何执行垃圾收集?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定以下代码,如何确保已完成的MyWorker对象被销毁/释放内存?

由于我的脚本的作用,我需要约50个线程不断从cURL获取数据并对其进行处理。



我已经尝试了线程从不离开 run(),或者如本示例代码所示他们离开运行并具有收集功能产生它们的新副本。

但是无论我在一分钟左右的时间内遇到了什么内存限制,你能告诉我我做错了什么吗?

  class MyWorker扩展了Threaded 
{
public $ complete;
public function __construct(){$ this-> complete = false;}
public function run(){$ this-> complete = true;}
}

$ pool = new Pool(50);
for($ i = 0; $ i <50; $ i ++)
$ pool-> submit(new MyWorker());
$ pool-> collect(function($ worker)
{
global $ pool;
if($ worker-> complete == true)
$ pool-> submit(new MyWorker());
return $ worker-> complete;
});
$ pool-> shutdown();


解决方案

为什么



为什么要收集?



Worker 通过pthreads要求程序员保留对正在执行的 Threaded 对象的正确引用。这对于程序员来说在用户空间中可靠地实现是很困难的,所以pthreads提供了 Pool > Workers 的抽象,为了维护这些引用,pthread需要知道对象何时是垃圾,它提供了 Pool :: collect 接口为此目的。 Pool :: collect 需要一个Closure,它应该接受一个 Threaded 对象并返回布尔值 true 如果传递的对象完成执行。



如何



手头上...



为了保持提交任务的执行并且不耗尽资源,您必须创建一个已完成任务的队列,以重新提交给 Pool



以下代码演示了这样做的一种明智方式:

 <?php 

define(LOG,Mutex :: create());
/ *线程安全日志到标准输出* /
函数slog($ message,$ args = []){
$ args = func_get_args(); $()
if(($ message = array_shift($ args))){
Mutex :: lock(LOG);
echo vsprintf(
{$ message} \\\
,$ args);
Mutex :: unlock(LOG);



类请求扩展Threaded {
public function __construct($ url){
$ this-> url = $ url;


public function run(){
$ response = @file_get_contents($ this-> url);
$ b $ slog(%s返回%d个字节,
$ this-> url,strlen($ response));

$ this-> reQueue();
}

public function getURL(){return $ this-> url; }

public function isQueued(){return $ this-> queued; }
public function reQueue(){$ this-> queued = true; }

保护$ url;
保护$ queued = false;
}

/ *创建50个线程池* /
$ pool = new Pool(50);

提交50个执行请求* /
while(@ $ i ++ <50){
$ pool-> submit(new Request(sprintf(
http://google.com/?q=%s,md5($ i))));
}

do {
$ queue = array();

$ pool-> collect(函数($请求)使用($ pool,& $ queue){
/ *检查重新计算的项目* /
if $ request $> isQueued()){
/ *获取请求的URL,插入队列* /
$ queue [] =
$ request-> getURL();
/ *允许收集此工作* /
返回true;
}
});
$ b $ * / *重新提交已完成的任务到池* /
if(count($ queue)){
foreach($ queue as $ queued)
$ pool-> ;提交(新请求($ queued));
}

/ *在这里睡了几秒钟......因为,很好! * /
usleep(2.5 * 1000000);
} while(true);
?>


Given the following code, how can you ensure that the completed MyWorker objects are destroyed/their memory freed?

Due to what my script does I need ~50 threads constantly obtaining data from cURL, and processing it.

I've tried both having the threads never leave run(), or as shown in this sample code where they leave run and have the collect function spawn a new copy of them.

But not matter what I hit the memory limits after a minute or so. Could you tell me what I'm doing wrong?

class MyWorker extends Threaded
{
    public $complete;
    public function __construct() {$this->complete = false;}
    public function run() {$this->complete = true;}
}

$pool = new Pool(50);
for($i=0; $i<50; $i++)
    $pool->submit(new MyWorker());
$pool->collect(function($worker)
{
    global $pool;
    if($worker->complete == true)
        $pool->submit(new MyWorker());
    return $worker->complete;
});
$pool->shutdown();

解决方案

Why

Why should I collect anyway ?

The Worker threads provided by pthreads require that the programmer retain the correct references to Threaded objects that are being executed. This is difficult for the programmer to achieve in userland reliably, so pthreads provides the Pool abstraction of Workers which maintains references for you.

In order to maintain those reference pthreads needs to know when an object is garbage, it provides the Pool::collect interface for this purpose. Pool::collect takes a Closure which should accept a Threaded object and return boolean true if the passed object is finished executing.

How

The task at hand ...

In order to keep submitting tasks for execution and not exhaust resources, you must create a queue of completed tasks for resubmission to the Pool

The following code demonstrates a sane way of doing this:

<?php

define("LOG", Mutex::create());
/* thread safe log to stdout */
function slog($message, $args = []) {
    $args = func_get_args();
    if (($message = array_shift($args))) {
        Mutex::lock(LOG);
        echo vsprintf(
            "{$message}\n", $args);
        Mutex::unlock(LOG);
    }
}

class Request extends Threaded {
    public function __construct($url) {
        $this->url = $url;
    }

    public function run() {
        $response = @file_get_contents($this->url);

        slog("%s returned %d bytes",
            $this->url, strlen($response));

        $this->reQueue();
    }

    public function getURL()        { return $this->url; }

    public function isQueued()      { return $this->queued; }
    public function reQueue()       { $this->queued = true; }

    protected $url;
    protected $queued = false;
}

/* create a pool of 50 threads */
$pool = new Pool(50);

/* submit 50 requests for execution */
while (@$i++<50) {
    $pool->submit(new Request(sprintf(
        "http://google.com/?q=%s", md5($i))));
}

do {
    $queue = array();

    $pool->collect(function($request) use ($pool, &$queue) {
        /* check for items to requeue */
        if ($request->isQueued()) {
            /* get the url for the request, insert into queue */
            $queue[] = 
                $request->getURL();
            /* allow this job to be collected */
            return true;
        }
    });

    /* resubmit completed tasks to pool */
    if (count($queue)) {
        foreach ($queue as $queued)
            $pool->submit(new Request($queued));
    }

    /* sleep for a couple of seconds here ... because, be nice ! */
    usleep(2.5 * 1000000);
} while (true);
?>

这篇关于PHP pThreads - 你如何执行垃圾收集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆