当新文件到达S3时,触发luigi任务 [英] When a new file arrives in S3, trigger luigi task

查看:116
本文介绍了当新文件到达S3时,触发luigi任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有新对象的存储桶,新对象会根据创建时间随机添加键.例如:

I have a bucket with new objects getting added at random intervals with keys based on their time of creation. For example:

's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time)

实际上,这些是Scrapy爬网的输出.我想触发一个任务,将这些爬网与我拥有的主.csv产品目录文件(称为"product_catalog.csv")相匹配,该文件也会定期更新.

In fact, these are the outputs of Scrapy crawls. I want to trigger a task that matches these crawls to a master .csv product catalog file I have (call it "product_catalog.csv"), which also gets updated regularly.

现在,我有一些用全局变量编写的Python脚本,每次运行此过程时都会填写这些变量.这些需要成为导入的属性.

Right now, I have several Python scripts I have written with global variables that I fill in every time I run this process. Those need to become imported attributes.

这就是需要发生的事情:

So here is what needs to happen:

1)新的csv文件根据爬网完成的时间显示在"s3://my-bucket/mass/..."中,并且具有唯一的键名.路易吉(Luigi)看到了这一点,开始了.
2)luigi在新文件上运行"cleaning.py",因此需要在运行时向其提供"cleaning.py"(显示在S3中的文件)的参数.除了传递到下一步之外,结果还保存在S3中.
3)从数据库中获取"product_catalog.csv"的最新版本,并使用"matching.py"中的"cleaning.py"结果

1) New csv file shows up in "s3://my-bucket/mass/..." with a unique key name based on the time the crawl completed. Luigi sees this and begins.
2) "cleaning.py" gets run by luigi on the new file, so the parameter of "cleaning.py" (the file that showed up in S3) needs to be supplied to it at runtime. The results get saved in S3 in addition to being passed on to the next step.
3) The latest version of "product_catalog.csv" is pulled from a database and uses the results of "cleaning.py" in "matching.py"

我意识到这可能并不完整.我将根据需要提供编辑内容,以使其变得更加清晰.

I realize this may not make complete sense. I will supply edits as necessary to make it all more clear.

编辑

基于最初的答案,我将其配置为拉动操作,从而节省了执行过程中的所有步骤.但是现在我很迷茫.应该注意的是,这是我第一次将Python项目绑定在一起,因此我在学习诸如 init .py之类的东西时会发现.与往常一样,这是一条因成功而兴奋的坎bump之路,紧接着在下一个障碍中感到困惑.

Based on initial answers, I have configured this to be a pull operation that saves steps along the way. But now I am pretty lost. It should be noted that this is my first time tying a Python project together, so there are things like including init.py that I am learning as I do this. As usual, it is a bumpy road of excitement from successes followed immediately by confusion at the next roadblock.

这是我的问题:
1)我不清楚如何从Scrapy导入蜘蛛.我大约有十二个,目标是让路易吉(Luigi)对所有这些人进行抓取>清理>匹配的过程. Scrapy文档说包括:

Here are my questions:
1) How to import the spiders from Scrapy is unclear to me. I have about a dozen of them and the goal is to have luigi manage the process of crawl>clean>match for all of them. The Scrapy documentation says to include:

class MySpider(scrapy.Spider):
    # Your spider definition

那是什么意思?在控制蜘蛛的脚本中重新编写蜘蛛吗?那没有任何意义,他们的例子也无济于事.

What does that mean? Re-write the spider in the script controlling the spider? That makes no sense and their examples are not helpful.

2)我已经配置了Scrapy管道以导出到S3,但是luigi似乎也可以通过output()来做到这一点.我应该使用哪个,以及如何让它们一起玩?

2) I have configured Scrapy pipelines to export to S3 but luigi also seems to do this with output(). Which should I use and how do I get them to play together?

3)Luigi说CrawlTask​​()成功运行了,但这是错误的,因为它可以在几秒钟内完成,并且爬网通常需要几分钟.也没有与成功相对应的输出文件.

3) Luigi says that CrawlTask() ran successfully but that is wrong because it completes in seconds and the crawls usually take a few minutes. There is also no output file corresponding to success.

4)我在哪里提供S3的凭据?

4) Where do I supply the credentials for S3?

这是我的代码.我已经注释掉了无法代替我认为更好的东西.但是我的感觉是,我想要做的事情有一个宏伟的架构,我只是还不了解.

Here is my code. I have commented out things that weren't working in lieu of what I perceive to be better. But my sense is that there is a grand architecture to what I want to do that I just don't understand yet.

import luigi
from luigi.s3 import S3Target, S3Client
import my_matching
from datetime import datetime
import os
import scrapy
from twisted.internet import reactor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
from my_crawlers.my_crawlers.spiders import my_spider

class CrawlTask(luigi.Task):
    crawltime = datetime.now()
    spider = luigi.Parameter()
    #vertical = luigi.Parameter()

    def requires(self):
        pass

    def output(self):
        return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_{}.csv".format(self.crawltime))
        #return S3Target("s3://my-bucket/mass/crawl_luigi_test_{}.csv".format(self.crawltime))

    def run(self):
        os.system("scrapy crawl %s" % self.spider)
        #process = CrawlerProcess(get_project_settings())
        #process.crawl("%s" % self.spider)
        #process.start()

class FetchPC(luigi.Task):
    vertical = luigi.Parameter()

    def output(self):
        if self.vertical == "product1":
            return "actual_data_staging/product1_catalog.csv"
        elif self.vertical == "product2":
            return "actual_data_staging/product2_catalog.csv"

class MatchTask(luigi.Task):
    crawltime = CrawlTask.crawltime
    vertical = luigi.Parameter()
    spider = luigi.Parameter()

    def requires(self):
        return CrawlTask(spider=self.spider)
        return FetchPC(vertical=self.vertical)

    def output(self):
        return luigi.LocalTarget("actual_data_staging/crawl_luigi_test_matched_{}.csv".format(self.crawltime))
        #return S3Target("s3://my-bucket/mass/crawl_luigi_test_matched_{}.csv".format(CrawlTask.crawltime))

    def run(self):
        if self.vertical == 'product1':
            switch_board(requires.CrawlTask(), requires.FetchPC())

MatchTask指的是我编写的python脚本,该脚本将报废的产品与我的产品目录进行比较.看起来像这样:

The MatchTask refers to a python script I wrote that compares the scraped products to my product catalog. It looks like this:

def create_search(value):
...
def clean_column(column):
...
def color_false_positive():
...
def switch_board(scrape, product_catalog):
# this function coordinates the whole script

推荐答案

下面是有关其外观的非常粗略的概述.我认为与luigi作为拉动系统的工作方式之间的主要区别在于,您首先指定所需的输出,然后触发该输出所依赖的其他任务.因此,与其在爬网结束时命名事物,不如在开始时就以您熟悉的事物来命名事物.可以用另一种方式来做,只是很多不必要的复杂性.

Below is a very rough outline of how it could look. I think the main difference from your outline in regards to luigi working as a pull system is that you specify the output you want first, which then triggers the other tasks upon which that output depends. So, rather than naming things with the time the crawl ends, it is easier to name things after something you know at the start. It is possible to do it the other way, just a lot of unnecessary complication.

class CrawlTask(luigi.Task):
    crawltime = luigi.DateParameter()

    def requires(self):
        pass

    def get_filename(self):
        return "s3://my-bucket/crawl_{}.csv".format(self.crawltime)

    def output(self):
        return S3Target(self.get_filename())

    def run(self):
        perform_crawl(s3_filename=self.get_filename())


class CleanTask(luigi.Task):
    crawltime = luigi.DateParameter()

    def requires(self):
        return CrawlTask(crawltime=self.crawltime)

    def get_filename(self):
        return "s3://my-bucket/clean_crawl_{}.csv".format(self.crawltime)

    def output(self):
        return S3Target(self.get_filename())

    def run(self):
        perform_clean(input_file=self.input().path, output_filename=self.get_filename())


class MatchTask(luigi.Task):
    crawltime = luigi.DateParameter()

    def requires(self):
        return CleanTask(crawltime=self.crawltime)

    def output(self):
        return ##?? whatever output of this task is

    def run(self):
        perform_match(input_file=self.input().path)

这篇关于当新文件到达S3时,触发luigi任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆