使用Luigi的循环机器学习ETL [英] Recurrent machine learning ETL using Luigi

查看:165
本文介绍了使用Luigi的循环机器学习ETL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

今天,手动完成我写的机器学习任务.我下载了所需的输入文件,学习并预测了事情,输出了.csv文件,然后将其复制到数据库中.

Today, running the machine learning job I've written is done by hand. I download the needed input files, learn and predict things, output a .csv file, which I then copy into a database.

但是,由于这已经投入生产,因此我需要使所有这些过程自动化.所需的输入文件每个月(最终会更频繁地)从提供商处到达S3存储桶中.

However, since this is going into production, I need to automate all this process. The needed input files will arrive every month (and eventually more frequently) into a S3 bucket from the provider.

现在,我正计划使用Luigi解决此问题.这是理想的过程:

Now I'm planning using Luigi to solve this problem. Here is the ideal process:

  • 每个星期(或一天或一个小时,无论我感觉如何更好),我都需要我的程序来监视S3存储桶中的新文件
  • 文件到达时,我的机器学习管道被触发,并吐出了一些熊猫数据框.
  • 之后,我需要我的程序将这些结果写入不同的数据库中

问题是,我不知道如何使用Luigi进行自动化:

The problem is, I don't know how to use Luigi to automate:

  1. 观看文件
  2. 安排任务(例如每月)
  3. 以可重复的方式部署它

今天,这是我想到的管道框架:

Today, here is the pipeline skeleton that I have in mind:

import luigi

from mylib import ml_algorithm
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done

class Extract(luigi.Task):
    date = luigi.DateParameter()
    s3_path = luigi.Parameter()
    filename = luigi.Parameter()
    def requires(self):
        pass
    def output(self, filename):
        luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename)
    def run(self):
        data = read_s3(s3_path + '/' + file)
        with self.output.open('w') as hdfs_file:
            write_hdfs(hdfs_file, data)


class Transform(luigi.Task):
    date = luigi.DateParameter()
    s3_path = luigi.Parameter()
    filename = luigi.Parameter()
    def requires(self):
        return Extract(self.date, self.s3_path, self.filename)
    def output(self, filename):
        luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename)
    def run(self):
        with self.input().open('r') as inputfile:
            data = read_hdfs(inputfile)
        result = ml_algorithm(data)
        with self.output().open('w') as outputfile:
            write_hdfs(outputfile, result)
        mark_as_done(filename)



class Load(luigi.Task):
    date = luigi.DateParameter()
    s3_path = luigi.Parameter()
    def requires(self):
        return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)]
    def output(self):
        # Fake DB target, just for illustrative purpose
        luigi.hdfs.DBTarget('...')
    def run(self):
        for input in self.input():
            with input.open('r') as inputfile:
                result = read_hdfs(inputfile)
            # again, just for didatic purposes
            db = self.output().connection
            write_db(db, result)

然后我将其添加到crontab并简单地包装到Docker容器中.

Then I would add this to crontab and simply wrap into a Docker container.

问题:

  • 这是人们用来执行此操作的正确模式吗?有更好的方法吗?
  • 如果我有Transform1(取决于输入数据)和Transform2(取决于Transform1结果),并且想要将两个结果保存到不同的DB中,那么如何使用Luigi管道来实现这一点(也在观看文件的情况下)?
  • 人们为此使用了与cron不同的东西吗?
  • 如何正确地将其容器化?
  • Is this the correct pattern that people use to do this? Is there a better way to do it?
  • If I had Transform1 (that depends on the input data) and Transform2 (that depends on Transform1 result) and wanted to save both results into different DBs, how could one implement this using a Luigi pipeline (also in this context of watching files)?
  • Do people use something different than cron for this?
  • How to containerize this properly?

推荐答案

您的模式在很大程度上看起来是正确的.我将从使用cron作业来调用触发Load任务管道的脚本开始.看来Load任务已经在S3存储桶中验证了新文件的存在,但是您必须将输出更改为有条件的,这可以是状态文件,也可以是其他任何无用的文件.您也可以在更高级别的WrapperTask(无输出)中执行此操作,仅当有新文件时才需要Load任务.然后,您可以使用此WrapperTask来执行两个不同的加载任务,并且分别需要您的Transform1Transform2.

Your pattern looks largely correct. I would start by using a cron job to call a script that triggers the Load task pipeline. It looks like this Load task already verifies the existence of new files in the S3 bucket, but you would have to change the output to also be conditional, which could be a status file or something else if there is nothing to do. You could also do this in a higher level WrapperTask (with no output) that just required the Load task only if there were new files. Then you could use this WrapperTask to require two different Load tasks and which would respectively require your Transform1 and Transform2.

添加容器...我的cron真正调用的是一个脚本,该脚本从git中提取我的最新代码,并在必要时构建一个新容器,然后调用docker run.我有另一个始终在运行luigid的容器.日常docker运行会使用CMD在容器中执行shell脚本,该脚本使用当天所需的参数调用luigi任务.

Adding in containers... what my cron really calls is a script that pulls my latest code from git, builds a new container if necessary, and then calls docker run. I have another container that is always up running luigid. The daily docker run executes a shell script in the container using CMD that calls the luigi task with the parameters needed for that day.

这篇关于使用Luigi的循环机器学习ETL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆