如何运行Luigi Pipeline的并行实例:Pid集已在运行 [英] How to run parallel instances of a Luigi Pipeline : Pid set already running

查看:77
本文介绍了如何运行Luigi Pipeline的并行实例:Pid集已在运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的管道.

我想用Id 2381启动一次,然后在运行第一个作业时,我想用Id 231开始第二个运行.第一次运行按预期完成.

I want to start it once with the Id 2381, then while the first job is running I want to start a second run with the Id 231. The first run completes as expected.

第二次运行返回此响应

Pid(s) set([10362]) already running
Process finished with exit code 0

我正在这样开始跑步

运行一个:

luigi.run(
    cmdline_args=["--id='newId13822'", "--TaskTwo-id=2381"],
    main_task_cls=TaskTwo()
)

运行两次:

luigi.run(
    cmdline_args=["--id='newId1322'", "--TaskTwo-id=231"],
    main_task_cls=TaskTwo()
)

每个任务都有唯一的ID,该ID由luigi的task_id_str(...)方法生成.当luigi.paramater,TaskTwo-id和MockTarget文件都不同时,为什么luigi认为任务已经在运行?

The tasks each have a unique ID as generated by luigi's task_id_str(...) method. Why does luigi think that the task is already running when the luigi.paramater, TaskTwo-id and MockTarget files are all different?

管道代码:

import time
import uuid
from luigi.mock import MockTarget
import luigi


class TaskOne(luigi.Task):
    run_id = luigi.Parameter()

    def output(self):
        return MockTarget("TaskOne{0}".format(self.run_id), mirror_on_stderr=True)

    def run(self):
        _out = self.output().open('w')
        time.sleep(10)
        _out.write(u"Hello World!\n")
        _out.close()


class TaskTwo(luigi.Task):
    id = luigi.Parameter(default=uuid.uuid4().__str__())

    def output(self):
        return MockTarget("TaskTwo{0}".format(self.id), mirror_on_stderr=True)

    def requires(self):
        return TaskOne(self.id)

    def run(self):
        _out = self.output().open('w')
        time.sleep(10)
        _out.write(u"Hello World!\n")
        _out.close()

推荐答案

这可能是因为您没有连接到调度程序服务器,所以它试图两次启动调度程序进程.你跑得快吗?

It looks like this might be because you are not connecting to a scheduler server, so it is trying to start a scheduler process twice. Are you running luigid?

我能够使您的代码在命令行中运行,如下所示.首先,我创建了一个目录,并将您的代码拖放到名为luigitest.py的文件中(减去luigi.run()命令).我将目录更改为创建的目录.然后我跑了:

I was able to get your code to run at the command line as follows. First I created a dir and dropped your code in a file called luigitest.py (minus the luigi.run() commands). I changed directory into the directory I created. Then I ran:

luigid --background --pidfile ./luigid.pid --logdir . --state-path .

然后,我在同一目录中打开了第二个终端.在第一个中,我跑了:

Then I opened up a second terminal in the same directory. In the first one I ran:

PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13822 --TaskTwo-id 2381 --local-scheduler

我第二次跑了(大约一秒钟后):

In the second one I ran (about a second later):

PYTHONPATH=. luigi --module luigitest TaskOne --run-id newId13823 --TaskTwo-id 2382 --local-scheduler

这些都输出"Hello World!"

These both output "Hello World!"

这篇关于如何运行Luigi Pipeline的并行实例:Pid集已在运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆