Scrapy的异步性会阻碍我直接创建CSV结果文件吗? [英] Is Scrapy's asynchronicity what is hindering my CSV results file from being created straightforwardly?

查看:57
本文介绍了Scrapy的异步性会阻碍我直接创建CSV结果文件吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的小项目从列表页面向下钻取",以不同的级别(最多几层)检索列出的项目的数据.列出的项目可能有很多页,每页上都有一些不同的项目/链接.我从以下位置收集每个项目的详细信息(并将它们存储在一个CSV Excel文件中):列出该项目的页面,该列表中的页面链接(更多详细信息"页面)以及另一个页面-假设商品制造商的原始清单.

My scrapy project "drills down" from list pages, retrieving data for the listed items at varying levels up to several deep. There could be many pages of listed items with handfuls of different items/links on each page. I'm collecting details (and storing them in a single CSV Excel file) of each of the items from: the page it is listed on, the page link in that list ("more details" page), and yet another page - the original listing by the item's manufacturer, let's say.

由于我正在构建CSV文件,因此在我的解析过程移至下一个项目之前,将每个项目的数据放在一行中会非常有帮助.如果我在显示出现在列表页面上的项目的CSV行时要求我在启动时可以发出启动请求,那么我可以做得很好.我会根据需要为每个级别向下钻取"尽可能多的级别,并在每个级别使用不同的解析函数,如果需要的话,将其一直停留在单个项目上,直到获得所需的整个CSV文件行.

Because I am building a CSV file, it would be VERY helpful to put each item's data on a single line before my parse process moves along to the next item. I could do it nicely if only I could get a Request to launch when I demand it while I am writing the CSV line for that item on the list page it appears on. I would just "drill down" as many levels as I need with a different parse function for each level, if needed, staying with a single item all the way until I have the entire CSV file line that it will need.

与其如此简单,不如说我将不得不为每个级别的每个项目重新编写CSV文件,因为我无法抓紧地给我这些项目的更多详细信息"链接回复直到退出项目列表页面的整个解析功能,这样我的CSV文件的结尾才不再位于要处理的项目上,并且我必须在每一行上都有一个唯一字段来查找每个项目在每个级别上,重写文件等.

Instead of it being that easy, it appears that I am going to have to re-write the CSV file for EVERY ITEM at EVERY LEVEL because I can't get scrapy to give me the items' "more details" links responses until I've exited the entire parse function of the page of items listing, thus the end of my CSV file is no longer at the item being processed, and I'm having to have a unique field on each line to look each item up at each level, re-write the file, etc.

理解,我不知道哪个回调级别将是任何特定项目的最后一个级别.这是逐项确定的.有些物品甚至没有更深"的等级.我剩下的唯一想法是只有一个递归的回调函数来处理所有回调级别,但是这样的事情是由其余的人完成的,或者scrapy是否有某种请求并等待响应"的方式,或者相似的东西?我不想在笔记本电脑上安装sql数据库,以前从未设置过该数据库.

Understand, I can't know which callback level will be the last one for any particular item. That is determined on an item-by-item basis. Some items won't even have "deeper" levels. My only idea left is to have only a single recursive callback function that handles all callback levels, but is that way this kind of thing is done by the rest of you, or does scrapy have some means of "Request and wait for response" or something similar? I'm not wanting to install a sql database on my laptop, never having set one up before.

谢谢!

from scrapy.spider import Spider
from scrapy.selector import Selector
from scrapy.contrib.spiders import CrawlSpider, Rule
from scrapy.contrib.exporter import CsvItemExporter
import csv
from meow.items import meowItem, meowPage
from scrapy.http import Request
import os
from mmap import mmap

class meowlistpage(Spider):
    name="melist"
    prefixhref='http://www.meow.com'
    #add '2_p/', '3_p/', or '4_p/', etc. to get to meow's other pages
    start_urls = [prefixhref+"/homes/for_sale/CO/house,mobile,land_type/10_rid/3000-30000_price/11-117_mp/800000-8000000_lot/lot_sort/46.377254,-96.82251,30.845647,-114.312744_rect/5_zm/1_p/1_rs/"]
    print 'Retrieving first page...'
    def parse(self, response):
        print 'First page retrieved'
        name="melist";prefixhref='http://www.meow.com';
        csvfilename = 'C:\\Python27\\My scripts\\meow\\'+name+'.csv';csvfile = open(csvfilename, 'w');pass;csvfile.close()
        hxs = Selector(response)
        page_tags=hxs.xpath("//div[@id='search-results']/article")
        for page_tags in page_tags:
            item = meowItem()
            item['ad_link']=prefixhref+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/@href").extract())[3:-2]
            idendplace=str(item['ad_link']).index('_zpid')-12; backhashstr=str(item['ad_link'])[idendplace:]; 
            idstartplace=backhashstr.index('/')+1; idendplace=len(backhashstr)-backhashstr.index('_zpid');
            item['zpid']=str(backhashstr)[idstartplace:-idendplace]
            item['sale_sold']=str(page_tags.xpath(".//div[1]/dl[1]/dt[1]/@class").extract())[8:-17]#"recentlySold" or "forSale"
            item['prop_price']=str(page_tags.xpath(".//div[1]/dl[1]/dt[2]/strong/text()").extract())[3:-2]
            if (str(item['sale_sold'])=='recentlySold'):item['prop_price']=str(item['prop_price'])+str(page_tags.xpath(".//div[1]/dl[1]/dt[1]/strong/text()").extract())[3:-2]
            try:
                dollrsgn=item['prop_price'].index('$');item['prop_price']=str(item['prop_price'])[dollrsgn:]
            except:pass
            item['ad_title']=str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/@title").extract())[3:-2]
            prop_latitude1=page_tags.xpath("@latitude").extract();item['prop_latitude']=str(prop_latitude1)[3:-8]+'.'+str(prop_latitude1)[5:-2]
            prop_longitude1=page_tags.xpath("@longitude").extract();item['prop_longitude']=str(prop_longitude1)[3:-8]+'.'+str(prop_longitude1)[7:-2]
            item['prop_address']=str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[1]/text()").extract())[3:-2]+', '+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[2]/text()").extract())[3:-2]+', '+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[3]/text()").extract())[3:-2]+'  '+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/span[4]/text()").extract())[3:-2]
            mightmentionacres = str(page_tags.xpath(".//div[1]/dl[2]/dt[2]/text()").extract())[3:-2]+' | '+str(page_tags.xpath(".//div[1]/dl[2]/dt[2]/text()").extract())[3:-2]+' | '+str(page_tags.xpath(".//div[1]/dl[2]/dt[1]/span[1]/span[1]/a/@title").extract())[3:-2]+' | '#+str()[3:-2]#this last segment comes from full ad
            item['prop_acres'] = mightmentionacres

            #Here is where I'm talking about

            yield Request(str(item['ad_link']), meta={'csvfilename':csvfilename, 'item':item}, dont_filter=True, callback = self.getthispage)

            #By this point, I wanted all the callback[s] to have had executed, but they don't - Scrapy waits to launch them until after this function completes

            csvfile = open(csvfilename, 'ab')
            outwriter = csv.writer(csvfile, delimiter=';', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            outwriter.writerow(item['zpid'], [item['sale_sold'], item['prop_price'], item['ad_title'],
                                             item['prop_address'], item['prop_latitude'],
                                              item['prop_longitude'], item['prop_acres'],
                                               item['ad_link'], item['parcelnum'], item['lot_width']])
            csvfile.close()
        #retrieve href of next page of ads
        next_results_pg=1
        page_tags=hxs.xpath("//div[@id='list-container']/div[@id='search-pagination-wrapper-2']/ul[1]")
        while (str(page_tags.xpath(".//li["+str(next_results_pg)+"]/@class").extract())[3:-2]!='current'):
            next_results_pg+=1;
            if (next_results_pg>80):
                break
        next_results_pg+=1#;item['next_results_pg'] = next_results_pg
        if (str(page_tags.xpath(".//li["+str(next_results_pg)+"]/@class").extract())[3:-2]=='next'):return
        next_results_pg_href = prefixhref+str(page_tags.xpath(".//li["+str(next_results_pg)+"]/a/@href").extract())[3:-2]#
        if (next_results_pg_href != prefixhref):#need to also avoid launching pages otherwise not desired
            page = meowPage()
            page['next_results_pg_href'] = next_results_pg_href
            print 'Retrieving page '+ next_results_pg_href
#           yield Request(next_results_pg_href, dont_filter=True, callback = self.parse)
        return
#       if (item['next_results_pg_href']==prefixhref):
#           print 'No results pages found after this one, next+results_pg='+str(next_results_pg)
#       else:
#           print 'Next page to parse after this one is '+str(item['next_results_pg_href'])

    def getthispage(self, response): 
        #Even though the yield statement was used, 
        #nothing here really gets executed until
        #until the first parse function resumes and 
        #then finishes completely.
        return 

推荐答案

我使用与Python 2.7打包在一起的标准SQLite3的解决方案:

My solution using the standard SQLite3 that is packaged with Python 2.7:

#  items.py contents:
#from scrapy.item import Item, Field

#class TrackItemScrapeItem(Item):
    # define the fields for your item here like:
#    f_1 = Field()
#    f_2 = Field()
#    sale_sold = Field()
#    price = Field()
#    item_ad_link = Field()
#    .and so on
#    .
#    .
#    .
#    <EOF>

#  TrackItemScrapespider.py contents:
from scrapy.spider import Spider
from scrapy.selector import Selector
from scrapy.contrib.spiders import CrawlSpider, Rule
from scrapy.contrib.exporter import CsvItemExporter
import csv
import sys
from zillow.items import TrackItemScrapeItem
from scrapy.http import Request
import os
import sqlite3 #allows scrapy asynchronous processes a global storage place for each item's scraped info
import time

class TrackItemScraper(Spider):
    name="buyitemslist"
    start_urls = ['http://www.buythisandthat.com']
    tablecolumns=""
    prikeyfldname='f_1'
    for field in getattr(TrackItemScrapeItem, 'fields'):
        # Just realize the order of these fields in the database has no relation to the order in items.py
        # nor is it consistent each time
        if (field==prikeyfldname):
            tablecolumns+=str(field)+' TEXT PRIMARY KEY NOT NULL, '
        else:
            tablecolumns+=str(field)+' TEXT, '
    tablecolumns=tablecolumns[:-2]
    con=None
    con=sqlite3.connect(name+".db");tablename='CrawlResults'
    if (con==None):
        print "SQL database not getting opened by sqlite3 !  (Is there room for the file ?)";sys.exit(1)
    cur=con.cursor()
    try:
        cur.execute('select * from '+tablename)
        cur.execute('PRAGMA table_info('+tablename+')')
        data = cur.fetchall()
        for d in data:
            print d[0], d[1], d[2]
        cur.execute('select * from '+tablename)
        print '\n'+str(cur.fetchall())
    except:
        cur.execute('DROP TABLE IF EXISTS '+tablename);cur.execute('CREATE TABLE '+tablename+' ('+tablecolumns+')')
    if (raw_input('\n\n Do you want to delete the previous '+name+'.CSV file?').capitalize()=='Y'):
        csvfile=name+'.csv'
        with open(csvfile, 'w') as csv_file:
            csv_file.close()
    if (raw_input('\n\n Do you want to save the results from the previous run to a new '+name+'.CSV file?').capitalize()=='Y'):
        csvfile=name+'.csv'
        with open(csvfile, 'w') as csv_file:
            csv_writer=csv.writer(csv_file)
            csv_writer.writerow([i[0] for i in cur.description])
            cur.execute('select * from '+tablename)
            csv_writer.writerows(cur)
            csv_file.close()
    if (raw_input('\n\n Do you want to clear out previous results from memory now to start clean?  Answer no ONLY if you haven\'t added new fields!').capitalize()=='Y'):
        cur.execute('DROP TABLE IF EXISTS '+tablename);cur.execute('CREATE TABLE '+tablename+' ('+tablecolumns+')')
    instancesrunning=1#start at 1 because a yield is about to happen implicitly for an asynchronous instance
#    seccntdwn=25#for failsafe counter, can used with next-to-last (for example) parse instance to tolerate an instance collision decrementing instancesrunning counter

    def parse(self, response):#recursive but first entry won't have meta args
        try:
            pageschema = response.meta['pageschema']
        except:#entered function without meta args, pageschema 0
            pageschema = 0
        hxs = Selector(response)
        if (pageschema==0):#top level pages
            ad_list=hxs.xpath("//xpath_to_ad_list")
        #    page_tags=''

            for item_ad in ad_list:
                item = TrackItemScrapeItem()
                # parse code for prikeyfldname field must be in here instead of these comment lines
                #item[prikeyfldname]=item_ad.xpath(".//div[whatever...
                # for this example, item['sale_sold'] and item['price'] will need parsing code in here as well
                con=None
                con=sqlite3.connect(self.name+".db")
                with sqlite3.connect(self.name+".db") as con:
                    cur=con.cursor()
                    replacevar = False
                    try:
                        cmd='INSERT INTO ' +str(self.tablename)+ ' (' +str(self.prikeyfldname)+ ') VALUES (\'' +str(item[self.prikeyfldname])+ '\')'
                        cur.execute(cmd)
                        print str(cmd) #won't see this unless insert succeeded 
                        con.commit()
                        #replacevar is for any fields for this item that you might want to keep old contents of, appending current info only when this item was not new in this run
                        replacevar = True
                        self.storthis(str(item[self.prikeyfldname]), 'sale_sold', str(item['sale_sold']), replace=replacevar)
                    except:#option example: if wanting to save old price, moves it into sale_sold if sale_sold field has changed
                        cmd='UPDATE ' +self.tablename+ ' SET sale_sold = \''  +str(item['sale_sold'])+ ', last retrieved advertised price was \' || (SELECT price) WHERE ' +str(self.prikeyfldname)+ ' = ' +str(item[self.prikeyfldname])+ ' AND sale_sold <> \'' +str(item['sale_sold'])+ '\''
                        print str(cmd)#prints even if SQL update fails for troubleshooting
                        cur.execute(cmd)
                        con.commit()
                #now storthis code for price field because we already parsed it
                self.storthis( str(item[self.prikeyfldname]),'price',item['price'], replace=True)
                #
                #remainder of parses and stores in here for this item, saving the ad_link for next yield/Request
                #
                self.instancesrunning+=1    
                yield Request(str(item['ad_link']), meta={'item':item, 'pageschema':1}, dont_filter=True, callback = self.parse)
            #code in here to determine link for next page of list of ads
            if (final_page_of_list_is_complete):
                self.savetofileiflastinstance()#the way to exit all callback instances
                return
            #parse link to next page in here if not done already
            time.sleep(6)#being nice to host
            self.instancesrunning+=1
            yield Request(next_results_pg_href, dont_filter=True, callback = self.parse)#don't need meta for list pages
        elif (pageschema==1): #we are to parse using 2nd schema
            item = response.meta['item']
            page_tags=hxs.xpath("//as you need for this schema"
            #parse and store in here, just remember in this example that we didn't pass along in meta whether this item is new or not this run
            for (every link on this page needing scraped but adjust pageschema for each type):
                self.instancesrunning+=1    
                yield Request(str(item['ad_link']), meta={'item':item, 'pageschema':as_needed}, dont_filter=True, callback = self.parse)
        elif (pageschema==2): #same general code in here as for schema 1
        elif (pageschema==3): #same general code in here as for schema 1
        elif (pageschema==4): #same general code in here as for schema 1
        self.savetofileiflastinstance()
        return

    def storthis (self, uniquefieldvalue, storfieldname, storfieldvalue, replace):
#       check for nulls etc in here, if desired  
        con=None
        con=sqlite3.connect(self.name+".db")
        if (replace==False):
            cmd='UPDATE '+str(self.tablename)+' SET '+storfieldname+' = (SELECT '+storfieldname+') || \''+storfieldvalue+'\' WHERE '+self.prikeyfldname+'=\''+uniquefieldvalue+'\''
        else:
            cmd='UPDATE '+str(self.tablename)+' SET '+storfieldname+'=\''+storfieldvalue+'\' where '+self.prikeyfldname+'=\''+uniquefieldvalue+'\''
        print str(cmd)
        try:
            with con:
                cur=con.cursor()
                cur.execute(cmd)
                con.commit()
        except:pass##we don't want to return with an error unexcepted
        return 

    def savetofileiflastinstance(self):
#        instancesrunningpre=self.instancesrunning #if utilizing the failsafe instance counting
        self.instancesrunning-=1
        if (self.instancesrunning>0):pass #or failsafe counter code in here
#            #if concerned that this might still be the last if an instance collision ever occurred while decrementing self.instancesrunning
#             we'll wait seccntdwn seconds for another instance to have its presence indicated by a change in the counter
#            if ():
                self.savetofileiflastinstance()
        else:   #this was last instance, write the csv file
            con=None
            con=sqlite3.connect(self.name+".db")
            cur=con.cursor()
            csvfile=self.name+'.csv'
            with open(csvfile, 'a') as csv_file:
                cur.execute('select * from '+self.tablename)
                csv_writer=csv.writer(csv_file)
                csv_writer.writerow([i[0] for i in cur.description])
                csv_writer.writerows(cur)
                csv_file.close()
            print '\n\nWrote the results to the '+self.name+'.CSV file.  Make sure this is the LAST line of output from this sript!  If it isn\'t, rewrite the source coding for instance tracking.\n\n'
            con.commit()
        return

这篇关于Scrapy的异步性会阻碍我直接创建CSV结果文件吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆