批评此python代码(带有线程池的爬网程序) [英] criticism this python code (crawler with threadpool)

查看:76
本文介绍了批评此python代码(带有线程池的爬网程序)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个python代码有多好?需要批评) 这段代码有错误,有时脚本会打印"ALL WAIT-CAN FINISH!"(全部等待-可以完成!). 并冻结(没有其他动作发生了.),但我找不到发生这种情况的原因?

how good this python code ? need criticism) there is a error in this code, some times script do print "ALL WAIT - CAN FINISH!" and freeze (no more actions are happend..) but i can't find reason why this happend?

具有线程池的站点搜寻器:

site crawler with threadpool:

import sys
from urllib import urlopen
from BeautifulSoup import BeautifulSoup, SoupStrainer
import re
from Queue import Queue, Empty
from threading import Thread

W_WAIT = 1
W_WORK = 0

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, pool, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()
        self.pool = pool
        self.state = None

    def is_wait(self):
        return self.state == W_WAIT


    def run(self):
        while True:
            #if all workers wait - time to exsit
            print "CHECK WAIT: !!! ",self.pool.is_all_wait()
            if self.pool.is_all_wait():
                print "ALL WAIT - CAN FINISH!"
                return
            try:
                func, args, kargs = self.tasks.get(timeout=3)
            except Empty:
                print "task wait timeout"
                continue

            self.state = W_WORK
            print "START !!! in thread %s" % str(self)
            #print args

            try: func(*args, **kargs)
            except Exception, e: print e
            print "!!! STOP in thread %s", str(self)
            self.tasks.task_done()
            self.state = W_WAIT
            #threads can fast empty it!
            #if self.tasks.qsize() == 0:
            #    print "QUIT!!!!!!"
            #    break

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        #self.tasks = Queue(num_threads)
        self.tasks = Queue()
        self.workers = []
        for _ in range(num_threads): 
            self.workers.append(Worker(self,self.tasks))


    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def is_all_wait(self):
        for w in self.workers:
            if not w.is_wait():
                return False
        return True

visited = set()
queue = Queue()
external_links_set = set()
internal_links_set = set()
external_links = 0

def process(pool,host,url):

    try:

        content = urlopen(url).read()
    except UnicodeDecodeError:
        return


    for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
        try:
            href = link['href']
        except KeyError:
            continue


        if not href.startswith('http://'):
            href = 'http://%s%s' % (host, href)
        if not href.startswith('http://%s%s' % (host, '/')):
            continue

        internal_links_set.add(href)


        if href not in visited:
            visited.add(href)
            pool.add_task(process,pool,host,href)

        else:
            pass

def start(host,charset):
    pool = ThreadPool(20)
    pool.add_task(process,pool,host,'http://%s/' % (host))
    pool.wait_completion()

start('evgenm.com','utf8') 

感谢您的帮助!我制定了新的实施方案: 您可以对代码2说什么? ==================================尝试#2 ============== ========================

Thanx for help! i make new implementation: What you can say about this code#2 ? ==================================TRY #2=======================================

    import sys
    from urllib import urlopen
    from BeautifulSoup import BeautifulSoup, SoupStrainer
    import re
    from Queue import Queue, Empty
    from threading import Thread


    W_STOP = 1

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, pool, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.pool = pool
        self.state = None
        self.start()



    def stop(self):
        self.state = W_STOP

    def run(self):
        while True:
            if self.state == W_STOP:
                print "\ncalled stop"
                break
            try:
                func, args, kargs = self.tasks.get(timeout=3)
            except Empty:
                continue
            print "\n***START*** %s" % str(self)
            try: 
                func(*args, **kargs)
            except Exception, e: 
                print e
            print "\n***STOP*** %s", str(self)
            self.tasks.task_done()



class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        #self.tasks = Queue(num_threads)
        self.tasks = Queue()
        self.workers = []
        for _ in range(num_threads): 
            self.workers.append(Worker(self,self.tasks))


    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def stop_threads(self):
        for w in self.workers:
            w.stop()

    def wait_stop(self):
        self.wait_completion()
        self.stop_threads()



    visited = set()
    queue = Queue()
    external_links_set = set()
    internal_links_set = set()
    external_links = 0

    def process(pool,host,url):

        try:

            content = urlopen(url).read()
        except UnicodeDecodeError:
            return


        for link in BeautifulSoup(content, parseOnlyThese=SoupStrainer('a')):
            try:
                href = link['href']
            except KeyError:
                continue


            if not href.startswith('http://'):
                href = 'http://%s%s' % (host, href)
            if not href.startswith('http://%s%s' % (host, '/')):
                continue

            internal_links_set.add(href)


            if href not in visited:
                visited.add(href)
                pool.add_task(process,pool,host,href)

            else:
                pass

    def start(host,charset):
        pool = ThreadPool(20)
        pool.add_task(process,pool,host,'http://%s/' % (host))
        pool.wait_stop()

    start('evgenm.com','utf8') 

推荐答案

您正在线程之间共享状态(即,在is_all_wait中)而没有同步.另外,所有线程都在等待"这一事实并不能可靠地表明 queue 是空的(例如,它们都可能正在获取任务).我怀疑,在队列真正为空之前,线程有时会退出.如果这种情况经常发生,您将被留在队列中,但没有线程来运行它们.因此queue.join()将永远等待.

You are sharing state between threads (i.e., in is_all_wait) without synchronization. Plus, the fact that all threads are "waiting" is not a reliable indicator that the queue is empty (for instance, they could all be in the process of getting a task). I suspect that, occasionally, threads are exiting before the queue is truly empty. If this happens often enough, you will be left with tasks in the queue but no threads to run them. So queue.join() will wait forever.

我的推荐是:

  1. 摆脱is_all_wait-这不是一个可靠的指标
  2. 摆脱任务state-并不是必须的
  3. 依靠queue.join通知您何时处理了所有内容
  1. Get rid of is_all_wait -- it's not a reliable indicator
  2. Get rid of the task state -- it's not really necessary
  3. Rely on queue.join to let you know when everything is processed

如果您需要杀死线程(例如,这是一个较大的,长期运行的程序的一部分),请在queue.join()之后执行.

If you need to kill the threads (for example, this is part of a larger, long-running program), then do so after the queue.join().

这篇关于批评此python代码(带有线程池的爬网程序)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆