使动态Luigi任务的失败变得不重要 [英] Make failure of a dynamic Luigi task non critical

查看:95
本文介绍了使动态Luigi任务的失败变得不重要的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个luigi工作流,该工作流通过ftp下载了一堆大文件并将它们存储在s3上.

I have a luigi workflow that downloads a bunch of large files via ftp and deposits them on s3.

我有一个任务,它读取要下载的文件列表,然后创建一堆实际执行下载的任务

I have one task that reads a list of files to download then creates a bunch of tasks that actually do the downloads

想法是,此工作流程的结果是一个包含成功下载列表的单个文件,所有失败的下载将在第二天的下一次运行中重新尝试.

The idea is that the result of this workflow is a single file containing a list of downloads that have succeeded, with any failed downloads being reattempted on the next run the following day.

问题是,如果任何下载任务失败,则永远不会创建成功的下载列表.

The problem is that if any of the download tasks fails then the successful download list is never created.

这是因为动态创建的任务成为创建它们并从其输出编译列表的主要任务的要求.

This is because the dynamically created tasks become requirements of the main task that creates them and compiles a list from their outputs.

有没有办法使这些下载任务的失败微不足道,以便编译列表减去失败任务的输出?

Is there a way to make failures of these download task insignificant so that the list is compiled minus the output of the failed tasks?

下面的示例代码,GetFiles是我们从命令行调用的任务.

Example code below, GetFiles is the task that we are calling from the command line.

class DownloadFileFromFtp(luigi.Task):
sourceUrl = luigi.Parameter()

def run(self):
    with self.output().open('w') as output:
        WriteFileFromFtp(sourceUrl, output)

def output(self):
    client = S3Client()
    return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)


@requires(GetListOfFileToDownload)
class GetFiles(luigi.Task):

def run(self):

    with self.input().open('r') as fileList:
        files = json.load(fileList)

        tasks = []
        taskOutputs = []

        for file in files:
            task = DownloadFileFromFtp(sourceUrl=file["ftpUrl"])
            tasks.append(task)
            taskOutputs.append(task.output())

        yield tasks

        successfulDownloads = MakeSuccessfulOutputList(taskOutputs)

    with self.output().open('w') as out:
        json.dump(successfulDownloads, out)

def output(self):
    client = S3Client()
    return S3Target(path='successfulDownloads.json', client=client)

推荐答案

此答案可能不正确-检查 DownloadFileFromFtp中的> 方法,同时仍然可以在GetFiles.run中使用DownloadFileFromFtp.output.

THIS ANSWER IS PROBABLY INCORRECT - CHECK THE COMMENTS

I have read the documentation a few times, and I found no indication of such things as non-critical failures. That being said, this behavior could be easily achieved by overriding Task.complete method in DownloadFileFromFtp, while still being able to use DownloadFileFromFtp.output in GetFiles.run.

通过覆盖return True,无论下载成功与否,任务DownloadFileFromFtp都会成功.

By overring with return True, the Task DownloadFileFromFtp will succeed regardless of the success of the download.

class DownloadFileFromFtp(luigi.Task):
    sourceUrl = luigi.Parameter()

    def run(self):
        with self.output().open('w') as output:
            WriteFileFromFtp(sourceUrl, output)

    def output(self):
        client = S3Client()
        return S3Target(path=someOutputPath, client=client, format=luigi.format.Nop)

    def complete(self,):
        return True

但是请注意,您还可以在complete方法中使用更复杂的逻辑-就像仅在任务在运行时遇到特定的网络故障时失败.

Notice however, that you could also use more complex logic in that complete method - like failing only if the task met a specific network failure at runtime.

这篇关于使动态Luigi任务的失败变得不重要的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆