luigi相关内容
我正在使用 Spotify 的 Luigi 在 Python 3.6 中编写我的第一个项目,以安排一些自然语言处理任务一个管道. 我注意到 Task 类的 output() 函数总是返回某种 Target 对象,它只是某个地方的某个文件,无论是本地的还是远程的.因为我的任务会产生更复杂的数据结构,比如解析树,所以我将它们作为字符串写入文件并在之后再次读取它们非常尴尬. 所以我想问一下是
..
我有一个 luigi python 任务,其中包括一些 pyspark 库.现在我想用 spark-submit 在 mesos 上提交这个任务.我应该怎么做才能运行它?下面是我的代码框架: from pyspark.sql import 函数为 F从 pyspark 导入 SparkContext类 myClass(SparkSubmitTask):# date = luigi.DatePar
..
考虑一个任务通过动态依赖依赖另一个任务的情况: 导入路易吉from luigi import Task, TaskParameter, IntParameter类任务A(任务):父 = 任务参数()arg = 内部参数(默认值 = 0)def需要(自我):返回 self.parent()定义运行(自我):打印(f“任务 A arg = {self.arg}")类任务B(任务):arg = 内部参
..
这是我第二次尝试了解如何将参数传递给Luigi中的依赖项.第一个是此处. 想法是:我有 TaskC ,它依赖于 TaskB ,这依赖于 TaskA ,而依赖于 Task0 .我希望整个序列始终保持完全相同,只是我希望能够控制从 Task0 读取什么文件,将其称为 path .路易吉(Luigi)的哲学通常是,每个任务应该只知道它所依赖的任务及其参数.问题在于, TaskC , TaskB 和
..
请考虑以下任务: import luigi类YieldFailTaskInBatches(luigi.Task):def运行(自己):对于我在范围(5)中:屈服 [FailTask(i,j)对于范围j中的(2)]YieldAllFailTasksAtOnce(luigi.Task)类:def运行(自己):屈服 [FailTask(i,j)对于范围j中的(2)对于我在范围内(5)
..
我的团队为小型项目提供了整体服务,但为了进行重新架构和扩展,我们计划迁移到Amazon AWS的云服务,并评估编排是将Luigi作为容器任务运行还是使用AWS Step用功能代替吗?我对他们中的任何一个都没有任何经验,尤其是Luigi.任何人都可以指出他们在Luigi上遇到的任何问题,或者可以证明它比AWS更好吗?任何其他建议相同. 提前谢谢. 解决方案 我不知道AWS如何进行编排,
..
我想使用Luigi管道通过Kubernetes cronjob执行数据迁移.我的luigi任务接收到要通过cronjob命令传递的--start参数. apiVersion: batch/v1beta1 kind: CronJob metadata: name: migration spec: schedule: "0 0 */1 * *" jobTemplate: sp
..
我有一个带有新对象的存储桶,新对象会根据创建时间随机添加键.例如: 's3://my-bucket/mass/%s/%s/%s/%s/%s_%s.csv' % (time.strftime('%Y'), time.strftime('%m'), time.strftime('%d'), time.strftime('%H'), name, the_time) 实际上,这些是Scrapy爬
..
我有一个luigi工作流,该工作流通过ftp下载了一堆大文件并将它们存储在s3上. 我有一个任务,它读取要下载的文件列表,然后创建一堆实际执行下载的任务 想法是,此工作流程的结果是一个包含成功下载列表的单个文件,所有失败的下载将在第二天的下一次运行中重新尝试. 问题是,如果任何下载任务失败,则永远不会创建成功的下载列表. 这是因为动态创建的任务成为创建它们并从其输出编译列表
..
我最近实现了luigi管道来处理我们生物信息学管道之一的处理.但是,关于如何设置我不掌握的这些任务有一些基本要求. 假设我有一个由三个任务组成的链,我希望能够与多个工作人员一起运行.例如,三个工作人员的依赖关系图可能看起来像: /taskC-> taskB-> taskA -taskC-> taskB-> taskA \ taskC-> taskB-> taskA 我可能会写
..
我们有一个Luigi任务,该任务从第三方服务请求一条信息.我们限制了每分钟对该API调用可以执行的调用请求的数量. 是否可以根据每个任务指定调度程序每单位时间必须运行多少个此类任务? 解决方案 我们在任务中实现了自己的速率限制.我们的API限制足够低,我们可以用一个线程将其饱和.当我们收到速率限制响应时,我们退出并重试. 您可以做的一件事情是将API调用声明为资源.您可以在配置
..
我开始将每晚的数据管道从可视化的ETL工具移植到Luigi,我真的很高兴能有一个可视化工具来查看作业的状态.但是,我注意到上一个作业(名为MasterEnd)完成后几分钟,除MasterEnd之外,所有节点都从图中消失了.这有点不方便,因为我希望看到当天/过去的日子都完成了. 此外,如果我在可视化工具中直接转到上一个作业的URL,它将找不到它运行的任何历史记录:Couldn't find t
..
如何将参数传递给Luigi?如果我有一个名为FileFinder.py的python文件,其类名为getFIles: class getFiles(luigi.Task): ,我想将目录传递给此类,例如: C://Documents//fileName 然后在我的运行方法中使用此参数 def run(self): 如何在命令行中运行此命令并添加要在我的代码中使用
..
我正在将celery用于我的Web应用程序. Celery执行父级任务,然后再执行任务的其他子级 芹菜的问题 我无法通过luigi获得依赖图和可视化工具,以查看父任务的状态 Celery不提供重新启动失败的管道并从失败的地方开始的机制. 我可以很容易地从luigi那里得到这两件事. 所以我在想,一旦芹菜运行了父任务,然后在该任务中我执行了路易吉·皮普林(Luigi p
..
当前,我有一堆luigi任务排队在一起,并带有一个简单的依赖链(a -> b -> c -> d). d首先执行,最后a执行. a是要触发的任务. 除a之外的所有目标均返回luigi.LocalTarget()对象,并具有单个通用luigi.Parameter(),它是字符串(包含日期和时间).在luigi中央服务器(已启用历史记录)上运行. 问题是,当我重新运行所述任务a时,luig
..
我正在尝试以一种非常简单的方式来学习luigi的工作原理.就像一个新手一样,我想出了这段代码 import luigi class class1(luigi.Task): def requires(self): return class2() def output(self): return luigi.LocalTarget('class1.txt')
..
我正在使用django作为网络框架.我需要一个工作流引擎,该引擎可以执行同步以及异步(批处理任务)任务链.我找到了芹菜和路易吉作为批处理工作流程.我的第一个问题是这两个模块之间有什么区别. Luigi允许我们重新运行失败的任务链,并且仅重新执行失败的子任务.芹菜怎么样:如果我们重新运行链(修复失败的子任务代码后),它将重新运行已经成功的子任务吗? 假设我有两个子任务.第一个创建一些文件
..
我在尝试根据字典("cmdList")中定义的依赖关系创建依赖关系(子任务)时遇到上述错误.例如,"BDX010"是"BDX020"的依赖关系. Python 3.7. 请参阅底部的堆栈跟踪以获取确切的错误消息. import luigi from helpers import SQLTask import helpers import logging import time ac
..
我是气流自动化的新手,如果可以使用apache airflow(或luigi等)进行此操作,或者我只是制作一个长bash文件来执行此操作,我现在就不这样做。 我要为此构建数据 在AWS EMR上创建/克隆集群 安装python要求 安装pyspark相关库 从github获取最新代码 提交火花作业 在完成时终止群集 对于单个步骤,我可以制作.sh文件像下面这样(不确定这
..
我正在使用luigi执行一系列任务,如下所示: class Task1(luigi.Task): stuff = luigi.Parameter() def output(self): return luigi.LocalTarget('test.json') def run(self): with self.output().op
..