路易吉管道开始S3 [英] Luigi Pipeline beginning in S3
问题描述
我的初步文件,在 AWS S3
。可能有人点我,我需要怎么设置这个在路易吉任务
?
My initial files are in AWS S3
. Could someone point me how I need to setup this in a Luigi Task
?
我审议了有关文件,发现 luigi.S3
,但目前尚不清楚,我做什么用的,然后我搜索的网页,只有得到<$链接C $ C>砂浆路易吉和实施路易吉之上。
I reviewed the documentation and found luigi.S3
but is not clear for me what to do with that, then I searched in the web and only get links from mortar-luigi
and implementation in top of luigi.
更新
以下规定@matagus的例子后(我创建了〜/ .boto
文件作为提示过):
After following the example provided for @matagus (I created the ~/.boto
file as suggested too):
# coding: utf-8
import luigi
from luigi.s3 import S3Target, S3Client
class MyS3File(luigi.ExternalTask):
def output(self):
return S3Target('s3://my-bucket/19170205.txt')
class ProcessS3File(luigi.Task):
def requieres(self):
return MyS3File()
def output(self):
return luigi.LocalTarget('/tmp/resultado.txt')
def run(self):
result = None
for input in self.input():
print("Doing something ...")
with input.open('r') as f:
for line in f:
result = 'This is a line'
if result:
out_file = self.output().open('w')
out_file.write(result)
当我执行它什么也不会发生。
When I execute it nothing happens
DEBUG: Checking if ProcessS3File() is complete
INFO: Informed scheduler that task ProcessS3File() has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running ProcessS3File()
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done ProcessS3File()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ProcessS3File() has status DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread
正如你所看到的,消息做一些...
从不打印。什么是错的?
As you can see, the message Doing something...
never prints. What is wrong?
推荐答案
这里的关键是定义的外部任务有没有输入和其输出是那些文件你已经生活在S3 。路易吉文档提到这需要另一个任务:
The key here is to define an External Task that has no inputs and which outputs are those files you already have in living in S3. Luigi docs mention this in Requiring another Task:
请注意,需要()不能返回一个目标对象。如果您有外部创建一个简单的目标对象,您可以在任务类包装它
Note that requires() can not return a Target object. If you have a simple Target object that is created externally you can wrap it in a Task class
所以,基本上你最终得到的是这样的:
So, basically you end up with something like this:
import luigi
from luigi.s3 import S3Target
from somewhere import do_something_with
class MyS3File(luigi.ExternalTask):
def output(self):
return luigi.S3Target('s3://my-bucket/path/to/file')
class ProcessS3File(luigi.Task):
def requires(self):
return MyS3File()
def output(self):
return luigi.S3Target('s3://my-bucket/path/to/output-file')
def run(self):
result = None
# this will return a file stream that reads the file from your aws s3 bucket
with self.input().open('r') as f:
result = do_something_with(f)
# and the you
out_file = self.output().open('w')
# it'd better to serialize this result before writing it to a file, but this is a pretty simple example
out_file.write(result)
更新:
路易吉使用博托,以读取文件和/或把它们写入AWS S3,所以为了使这个code的工作,你将需要提供您的凭据在博托配置文件〜/博托
(寻找其他的可能的配置文件的位置这里):
Luigi uses boto to read files from and/or write them to AWS S3, so in order to make this code work, you'll need to provide your credentials in your boto config file ~/boto
(look for other possible config file locations here):
[Credentials]
aws_access_key_id = <your_access_key_here>
aws_secret_access_key = <your_secret_key_here>
这篇关于路易吉管道开始S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!