路易吉管道开始S3 [英] Luigi Pipeline beginning in S3

查看:146
本文介绍了路易吉管道开始S3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的初步文件,在 AWS S3 。可能有人点我,我需要怎么设置这个在路易吉任务

My initial files are in AWS S3. Could someone point me how I need to setup this in a Luigi Task?

我审议了有关文件,发现 luigi.S3 ,但目前尚不清楚,我做什么用的,然后我搜索的网页,只有得到<$链接C $ C>砂浆路易吉和实施路易吉之上。

I reviewed the documentation and found luigi.S3 but is not clear for me what to do with that, then I searched in the web and only get links from mortar-luigi and implementation in top of luigi.

更新

以下规定@matagus的例子后(我创建了〜/ .boto 文件作为提示过):

After following the example provided for @matagus (I created the ~/.boto file as suggested too):

# coding: utf-8

import luigi

from luigi.s3 import S3Target, S3Client

class MyS3File(luigi.ExternalTask):
    def output(self):
        return S3Target('s3://my-bucket/19170205.txt')

class ProcessS3File(luigi.Task):

    def requieres(self):
        return MyS3File()

    def output(self):
        return luigi.LocalTarget('/tmp/resultado.txt')

    def run(self):
        result = None

        for input in self.input():
           print("Doing something ...")
           with input.open('r') as f:
               for line in f:
                   result = 'This is a line'

        if result:
            out_file = self.output().open('w')
            out_file.write(result)

当我执行它什么也不会发生。

When I execute it nothing happens

DEBUG: Checking if ProcessS3File() is complete
INFO: Informed scheduler that task   ProcessS3File()   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running   ProcessS3File()
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done      ProcessS3File()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ProcessS3File()   has status   DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread

正如你所看到的,消息做一些... 从不打印。什么是错的?

As you can see, the message Doing something... never prints. What is wrong?

推荐答案

这里的关键是定义的外部任务有没有输入和其输出是那些文件你已经生活在S3 。路易吉文档提到这需要另一个任务

The key here is to define an External Task that has no inputs and which outputs are those files you already have in living in S3. Luigi docs mention this in Requiring another Task:

请注意,需要()不能返回一个目标对象。如果您有外部创建一个简单的目标对象,您可以在任务类包装它

Note that requires() can not return a Target object. If you have a simple Target object that is created externally you can wrap it in a Task class

所以,基本上你最终得到的是这样的:

So, basically you end up with something like this:

import luigi

from luigi.s3 import S3Target

from somewhere import do_something_with


class MyS3File(luigi.ExternalTask):

    def output(self):
        return luigi.S3Target('s3://my-bucket/path/to/file')

class ProcessS3File(luigi.Task):

    def requires(self):
        return MyS3File()

    def output(self):
        return luigi.S3Target('s3://my-bucket/path/to/output-file')

    def run(self):
        result = None
        # this will return a file stream that reads the file from your aws s3 bucket
        with self.input().open('r') as f:
            result = do_something_with(f)

        # and the you 
        out_file = self.output().open('w')
        # it'd better to serialize this result before writing it to a file, but this is a pretty simple example
        out_file.write(result)

更新:

路易吉使用博托,以读取文件和/或把它们写入AWS S3,所以为了使这个code的工作,你将需要提供您的凭据在博托配置文件〜/博托(寻找其他的可能的配置文件的位置这里):

Luigi uses boto to read files from and/or write them to AWS S3, so in order to make this code work, you'll need to provide your credentials in your boto config file ~/boto (look for other possible config file locations here):

[Credentials]
aws_access_key_id = <your_access_key_here>
aws_secret_access_key = <your_secret_key_here>

这篇关于路易吉管道开始S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆