PHP pThreads - 你如何执行垃圾收集? [英] PHP pThreads - How do you perform garbage collection?
问题描述
由于我的脚本的作用,我需要约50个线程不断从cURL获取数据并对其进行处理。
我已经尝试了线程从不离开 run()
,或者如本示例代码所示他们离开运行并具有收集功能产生它们的新副本。
但是无论我在一分钟左右的时间内遇到了什么内存限制,你能告诉我我做错了什么吗?
class MyWorker扩展了Threaded
{
public $ complete;
public function __construct(){$ this-> complete = false;}
public function run(){$ this-> complete = true;}
}
$ pool = new Pool(50);
for($ i = 0; $ i <50; $ i ++)
$ pool-> submit(new MyWorker());
$ pool-> collect(function($ worker)
{
global $ pool;
if($ worker-> complete == true)
$ pool-> submit(new MyWorker());
return $ worker-> complete;
});
$ pool-> shutdown();
为什么
为什么要收集?
Threaded
对象的正确引用。这对于程序员来说在用户空间中可靠地实现是很困难的,所以pthreads提供了 Pool >
的抽象,为了维护这些引用,pthread需要知道对象何时是垃圾,它提供了 Pool :: collect
接口为此目的。 Pool :: collect
需要一个Closure,它应该接受一个 Threaded
对象并返回布尔值 true
如果传递的对象完成执行。
如何
手头上...
为了保持提交任务的执行并且不耗尽资源,您必须创建一个已完成任务的队列,以重新提交给 Pool
以下代码演示了这样做的一种明智方式:
<?php
define(LOG,Mutex :: create());
/ *线程安全日志到标准输出* /
函数slog($ message,$ args = []){
$ args = func_get_args(); $()
if(($ message = array_shift($ args))){
Mutex :: lock(LOG);
echo vsprintf(
{$ message} \\\
,$ args);
Mutex :: unlock(LOG);
类请求扩展Threaded {
public function __construct($ url){
$ this-> url = $ url;
public function run(){
$ response = @file_get_contents($ this-> url);
$ b $ slog(%s返回%d个字节,
$ this-> url,strlen($ response));
$ this-> reQueue();
}
public function getURL(){return $ this-> url; }
public function isQueued(){return $ this-> queued; }
public function reQueue(){$ this-> queued = true; }
保护$ url;
保护$ queued = false;
}
/ *创建50个线程池* /
$ pool = new Pool(50);
提交50个执行请求* /
while(@ $ i ++ <50){
$ pool-> submit(new Request(sprintf(
http://google.com/?q=%s,md5($ i))));
}
do {
$ queue = array();
$ pool-> collect(函数($请求)使用($ pool,& $ queue){
/ *检查重新计算的项目* /
if $ request $> isQueued()){
/ *获取请求的URL,插入队列* /
$ queue [] =
$ request-> getURL();
/ *允许收集此工作* /
返回true;
}
});
$ b $ * / *重新提交已完成的任务到池* /
if(count($ queue)){
foreach($ queue as $ queued)
$ pool-> ;提交(新请求($ queued));
}
/ *在这里睡了几秒钟......因为,很好! * /
usleep(2.5 * 1000000);
} while(true);
?>
Given the following code, how can you ensure that the completed MyWorker objects are destroyed/their memory freed?
Due to what my script does I need ~50 threads constantly obtaining data from cURL, and processing it.
I've tried both having the threads never leave run()
, or as shown in this sample code where they leave run and have the collect function spawn a new copy of them.
But not matter what I hit the memory limits after a minute or so. Could you tell me what I'm doing wrong?
class MyWorker extends Threaded
{
public $complete;
public function __construct() {$this->complete = false;}
public function run() {$this->complete = true;}
}
$pool = new Pool(50);
for($i=0; $i<50; $i++)
$pool->submit(new MyWorker());
$pool->collect(function($worker)
{
global $pool;
if($worker->complete == true)
$pool->submit(new MyWorker());
return $worker->complete;
});
$pool->shutdown();
Why
Why should I collect anyway ?
The Worker
threads provided by pthreads require that the programmer retain the correct references to Threaded
objects that are being executed. This is difficult for the programmer to achieve in userland reliably, so pthreads provides the Pool
abstraction of Workers
which maintains references for you.
In order to maintain those reference pthreads needs to know when an object is garbage, it provides the Pool::collect
interface for this purpose. Pool::collect
takes a Closure which should accept a Threaded
object and return boolean true
if the passed object is finished executing.
How
The task at hand ...
In order to keep submitting tasks for execution and not exhaust resources, you must create a queue of completed tasks for resubmission to the Pool
The following code demonstrates a sane way of doing this:
<?php
define("LOG", Mutex::create());
/* thread safe log to stdout */
function slog($message, $args = []) {
$args = func_get_args();
if (($message = array_shift($args))) {
Mutex::lock(LOG);
echo vsprintf(
"{$message}\n", $args);
Mutex::unlock(LOG);
}
}
class Request extends Threaded {
public function __construct($url) {
$this->url = $url;
}
public function run() {
$response = @file_get_contents($this->url);
slog("%s returned %d bytes",
$this->url, strlen($response));
$this->reQueue();
}
public function getURL() { return $this->url; }
public function isQueued() { return $this->queued; }
public function reQueue() { $this->queued = true; }
protected $url;
protected $queued = false;
}
/* create a pool of 50 threads */
$pool = new Pool(50);
/* submit 50 requests for execution */
while (@$i++<50) {
$pool->submit(new Request(sprintf(
"http://google.com/?q=%s", md5($i))));
}
do {
$queue = array();
$pool->collect(function($request) use ($pool, &$queue) {
/* check for items to requeue */
if ($request->isQueued()) {
/* get the url for the request, insert into queue */
$queue[] =
$request->getURL();
/* allow this job to be collected */
return true;
}
});
/* resubmit completed tasks to pool */
if (count($queue)) {
foreach ($queue as $queued)
$pool->submit(new Request($queued));
}
/* sleep for a couple of seconds here ... because, be nice ! */
usleep(2.5 * 1000000);
} while (true);
?>
这篇关于PHP pThreads - 你如何执行垃圾收集?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!