Twisted/Python-逐行处理大文件 [英] Twisted/Python - processing a large file line by line

查看:0
本文介绍了Twisted/Python-逐行处理大文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用下面的代码读取文件并处理它。该文件非常大,有1200万行,所以目前我手动将其拆分成1000行文件,并按顺序为每1000行启动每个进程(bash脚本)。

有没有一种方法可以使用Twisted加载一个文件并从一个文件中按1000个项目处理它(进度条就好了),而不需要我手动拆分它?

scanner.py

import argparse

from tqdm import tqdm
from sys import argv
from pprint import pformat

from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers

import lxml.html

from geoip import geolite2
import pycountry

from tld import get_tld
import json
import socket

poweredby = ""
server = ""
ip = ""


def cbRequest(response, url):
    global poweredby, server, ip
    # print 'Response version:', response.version
    # print 'Response code:', response.code
    # print 'Response phrase:', response.phrase
    # print 'Response headers:'
    # print pformat(list(response.headers.getAllRawHeaders()))
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
    server = response.headers.getRawHeaders("Server")[0]

    #print poweredby
    #print server

    d = readBody(response)
    d.addCallback(cbBody, url)
    return d


def cbBody(body, ourl):
    global poweredby, server,ip

    #print body
    html_element = lxml.html.fromstring(body)
    generator = html_element.xpath("//meta[@name='generator']/@content")

    ip = socket.gethostbyname(ourl)

    try:
        match = geolite2.lookup(ip)
        if match is not None:
            country = match.country
            try:

                c = pycountry.countries.lookup(country)
                country = c.name
            except:
                country = ""

    except:
        country = ""
    try:
        res = get_tld("http://www" + ourl, as_object=True)
        tld = res.suffix
    except:
        tld = ""

    try:
        match = re.search(r'[w.-]+@[w.-]+', body)
        email = match.group(0)
    except:
        email = ""

    permalink=ourl.rstrip().replace(".","-")

    try:
        item = generator[0]
        val = "{ "Domain":" + json.dumps(
            "http://" + ourl.rstrip()) + ","IP":"" + ip + "","Server":" + json.dumps(
            str(server)) + ","PoweredBy":" + json.dumps(
                str(poweredby)) + ","MetaGenerator":" + json.dumps(item) + ","Email":" + json.dumps(
                    email) + ","Suffix":"" + tld + "","CountryHosted":"" + country+"","permalink":""+permalink+"" }"
    except:
        val = "{ "Domain":" + json.dumps(
            "http://" + ourl.rstrip()) + ","IP":"" + ip + ""," + ""Server":" + json.dumps(
            str(server)) + ","PoweredBy":" + json.dumps(
                str(poweredby)) + ","MetaGenerator":"","Email":" + json.dumps(
                    email) + ","Suffix":"" + tld + "","CountryHosted":"" + country+"","permalink":""+permalink+"" }"


    print val

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Scanner v0.99')
    parser.add_argument(
        '-i', '--input', help='Input list of domains', required=True)
    args = parser.parse_args()
    input = args.input

with open(input) as f:
    urls = f.read().splitlines()


def mainjob(reactor, urls=urls):
    for url in tqdm(urls):
        agent = Agent(reactor)
        d = agent.request(
            'GET', "http://" + url,
            Headers({'User-Agent': ['bot']}),
            None)
        d.addCallback(cbRequest, url)
        d.addErrback(lambda x: None)  # ignore errors
    return d


react(mainjob, argv[3:])

更新1:

现在我这样执行它:

file.txt-12,000,000行

Chunk01.txt-1000行的文件 。 。 。

我为每个区块文件执行一个脚本。

python scanner.py chunk01.txt
python scanner.py chunk02.txt
.
.
.

要执行一次脚本:

python scanner.py file.txt

问题在于,我需要将URL作为参数传递给action()。如果我将其作为12,000,000个文件读取到内存(通过f.read()),则它太大了。因此,我拆分了文件并对每个小文件执行脚本。

希望现在更清楚了...

更新2:

根据@Jean-Paul Calderone的回答,我编写了此代码。

它似乎起作用了,但是我被撞了,因为:

180,000次迭代...我假设180,000个域(每行来自输入文件),脚本只打印/输出了大约35707行(条目)。我预计大约是18万.我知道有些域名会超时。当我以旧的方式运行时,它更一致,数量更接近,即输入域的数量接近输出文件中的输出行。

代码中会有"坏"的地方吗?有什么想法吗?

python scanner.py > out.txt

181668it [1:47:36,  4.82it/s]

并计算行数:

wc -l out.txt
36840 out.txt

scanner.py

import argparse

from tqdm import tqdm
from sys import argv
from pprint import pformat

from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults

import lxml.html

from geoip import geolite2
import pycountry

from tld import get_tld
import json
import socket

poweredby = ""
server = ""
ip = ""


def cbRequest(response, url):
    global poweredby, server, ip
    # print 'Response version:', response.version
    # print 'Response code:', response.code
    # print 'Response phrase:', response.phrase
    # print 'Response headers:'
    # print pformat(list(response.headers.getAllRawHeaders()))
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
    server = response.headers.getRawHeaders("Server")[0]

    #print poweredby
    #print server

    d = readBody(response)
    d.addCallback(cbBody, url)
    return d


def cbBody(body, ourl):
    global poweredby, server,ip

    #print body
    html_element = lxml.html.fromstring(body)
    generator = html_element.xpath("//meta[@name='generator']/@content")

    ip = socket.gethostbyname(ourl)

    try:
        match = geolite2.lookup(ip)
        if match is not None:
            country = match.country
            try:

                c = pycountry.countries.lookup(country)
                country = c.name
            except:
                country = ""

    except:
        country = ""
    try:
        res = get_tld("http://www" + ourl, as_object=True)
        tld = res.suffix
    except:
        tld = ""

    try:
        match = re.search(r'[w.-]+@[w.-]+', body)
        email = match.group(0)
    except:
        email = ""

    permalink=ourl.rstrip().replace(".","-")

    try:
        item = generator[0]
        val = "{ "Domain":" + json.dumps(
            "http://" + ourl.rstrip()) + ","IP":"" + ip + "","Server":" + json.dumps(
            str(server)) + ","PoweredBy":" + json.dumps(
                str(poweredby)) + ","MetaGenerator":" + json.dumps(item) + ","Email":" + json.dumps(
                    email) + ","Suffix":"" + tld + "","CountryHosted":"" + country+"","permalink":""+permalink+"" }"
    except:
        val = "{ "Domain":" + json.dumps(
            "http://" + ourl.rstrip()) + ","IP":"" + ip + ""," + ""Server":" + json.dumps(
            str(server)) + ","PoweredBy":" + json.dumps(
                str(poweredby)) + ","MetaGenerator":"","Email":" + json.dumps(
                    email) + ","Suffix":"" + tld + "","CountryHosted":"" + country+"","permalink":""+permalink+"" }"


    print val


def main(reactor, url_path):
    urls = open(url_path)
    return mainjob(reactor, (url.strip() for url in urls))

def mainjob(reactor, urls=argv[2:]):
    #for url in urls:
    #  print url
    agent = Agent(reactor)
    work = (process(agent, url) for url in tqdm(urls))
    tasks = list(cooperate(work) for i in range(100))
    return gatherResults(list(task.whenDone() for task in tasks))



def process(agent, url):
    d = agent.request(
        'GET', "http://" + url,
        Headers({'User-Agent': ['bot']}),
        None)
    d.addCallback(cbRequest, url)
    d.addErrback(lambda x: None)  # ignore errors
    return d

react(main, ["./domains.txt"])

更新3:

已更新代码以将错误打印到errors.txt

import argparse

from tqdm import tqdm
from sys import argv
from pprint import pformat

from twisted.internet.task import react
from twisted.web.client import Agent, readBody
from twisted.web.http_headers import Headers
from twisted.internet.task import cooperate
from twisted.internet.defer import gatherResults

import lxml.html

from geoip import geolite2
import pycountry

from tld import get_tld
import json
import socket

poweredby = ""
server = ""
ip = ""

f = open("errors.txt", "w")


def error(response, url):
    f.write("Error: "+url+"
") 


def cbRequest(response, url):
    global poweredby, server, ip
    # print 'Response version:', response.version
    # print 'Response code:', response.code
    # print 'Response phrase:', response.phrase
    # print 'Response headers:'
    # print pformat(list(response.headers.getAllRawHeaders()))
    poweredby = response.headers.getRawHeaders("X-Powered-By")[0]
    server = response.headers.getRawHeaders("Server")[0]

    #print poweredby
    #print server

    d = readBody(response)
    d.addCallback(cbBody, url)
    return d


def cbBody(body, ourl):
    global poweredby, server,ip

    #print body
    html_element = lxml.html.fromstring(body)
    generator = html_element.xpath("//meta[@name='generator']/@content")

    ip = socket.gethostbyname(ourl)

    try:
        match = geolite2.lookup(ip)
        if match is not None:
            country = match.country
            try:

                c = pycountry.countries.lookup(country)
                country = c.name
            except:
                country = ""

    except:
        country = ""
    try:
        res = get_tld("http://www" + ourl, as_object=True)
        tld = res.suffix
    except:
        tld = ""

    try:
        match = re.search(r'[w.-]+@[w.-]+', body)
        email = match.group(0)
    except:
        email = ""

    permalink=ourl.rstrip().replace(".","-")

    try:
        item = generator[0]
        val = "{ "Domain":" + json.dumps(
            "http://" + ourl.rstrip()) + ","IP":"" + ip + "","Server":" + json.dumps(
            str(server)) + ","PoweredBy":" + json.dumps(
                str(poweredby)) + ","MetaGenerator":" + json.dumps(item) + ","Email":" + json.dumps(
                    email) + ","Suffix":"" + tld + "","CountryHosted":"" + country+"","permalink":""+permalink+"" }"
    except:
        val = "{ "Domain":" + json.dumps(
            "http://" + ourl.rstrip()) + ","IP":"" + ip + ""," + ""Server":" + json.dumps(
            str(server)) + ","PoweredBy":" + json.dumps(
                str(poweredby)) + ","MetaGenerator":"","Email":" + json.dumps(
                    email) + ","Suffix":"" + tld + "","CountryHosted":"" + country+"","permalink":""+permalink+"" }"


    print val


def main(reactor, url_path):
    urls = open(url_path)
    return mainjob(reactor, (url.strip() for url in urls))

def mainjob(reactor, urls=argv[2:]):
    #for url in urls:
    #  print url
    agent = Agent(reactor)
    work = (process(agent, url) for url in tqdm(urls))
    tasks = list(cooperate(work) for i in range(100))
    return gatherResults(list(task.whenDone() for task in tasks))



def process(agent, url):
    d = agent.request(
        'GET', "http://" + url,
        Headers({'User-Agent': ['crawler']}),
        None)
    d.addCallback(cbRequest, url)
    d.addErrback(error, url) 
    return d

react(main, ["./domains.txt"])

f.close()

更新4:

我用Wireshark捕获了流量,只有2个域,这些域之前出错了:

user@laptop:~/crawler$ python scanner.py 
2it [00:00, 840.71it/s]
user@laptop:~/crawler$ cat errors.txt 
Error: google.al
Error: fau.edu.al

如您所见,它们有错误,但对于Wireshark,我看到了响应:

推荐答案

您需要为程序创建的并发数量添加限制。目前,您可以同时处理所有给定的URL-或者至少尝试:

def mainjob(reactor, urls=urls):
    for url in tqdm(urls):
        agent = Agent(reactor)
        d = agent.request(
            'GET', "http://" + url,
            Headers({'User-Agent': ['bot']}),
            None)
        d.addCallback(cbRequest, url)
        d.addErrback(lambda x: None)  # ignore errors
    return d

这将发出对每个URL的请求,而不等待其中任何一个完成。相反,可以使用twisted.internet.task.cooperate一次运行有限的数值。它一次运行一个请求:

def mainjob(reactor, urls):
    agent = Agent(reactor)
    work = (process(agent, url) for url in tqdm(urls))
    task = cooperate(work)
    return task.whenDone()

def process(agent, url):
    d = agent.request(
        'GET', "http://" + url,
        Headers({'User-Agent': ['bot']}),
        None)
    d.addCallback(cbRequest, url)
    d.addErrback(lambda x: None)  # ignore errors
    return d

您可能想要更多。因此,再调用coco()几次:

def mainjob(reactor, urls=urls):
    agent = Agent(reactor)
    work = (process(agent, url) for url in tqdm(urls))
    tasks = list(cooperate(work) for i in range(100))
    return gatherResults(list(task.whenDone() for task in tasks))

一次最多运行100个请求。每个任务从work中提取下一个元素并等待它。gatherResults等待全部100个任务完成。

现在只需避免一次将完整的输入加载到内存中:

def main(reactor, url_path):
    urls = open(url_path)
    return mainjob(reactor, (url.strip() for url in urls))

react(main, ["path-to-urls.txt"])

这将打开URL文件,但仅在需要时读取其中的行。

这篇关于Twisted/Python-逐行处理大文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆