如何防止“执行失败:[Errno 32]管道损坏”?在气流中 [英] How to prevent "Execution failed:[Errno 32] Broken pipe" in Airflow

查看:134
本文介绍了如何防止“执行失败:[Errno 32]管道损坏”?在气流中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚开始使用Airflow来协调我们的ETL管道。

I just started using Airflow to coordinate our ETL pipeline.

我在运行dag时遇到管道错误。

I encountered the pipe error when I run a dag.

我已经在此处看到了一般的stackoverflow讨论。

I've seen a general stackoverflow discussion here.

我的案例更多地是在气流方面。根据该帖子中的讨论,可能的根本原因是:

My case is more on the Airflow side. According to the discussion in that post, the possible root cause is:


如果请求被阻止或$ b通常会发生管道断开错误$ b花费的时间太长,在请求端超时后,它将关闭
连接,然后,当响应端(服务器)尝试向
写入套接字时,它将抛出管道中断错误。

The broken pipe error usually occurs if your request is blocked or takes too long and after request-side timeout, it'll close the connection and then, when the respond-side (server) tries to write to the socket, it will throw a pipe broken error.

这可能是我真正的原因,我有一个pythonoperator将在Airflow之外启动另一个工作,工作可能非常漫长(即10个小时以上),我想知道Airflow中可以用来防止此错误的机制是什么。

This might be the real cause in my case, I have a pythonoperator that will start another job outside of Airflow, and that job could be very lengthy (i.e. 10+ hours), I wonder if what is the mechanism in place in Airflow that I can leverage to prevent this error.

有人可以帮忙吗? ?

Can anyone help?

UPDATE1 20190303-1:

UPDATE1 20190303-1:

感谢@ y2k-shubham的SSHOperator,我可以使用它可以成功建立SSH连接,并能够在远程站点上运行一些简单的命令(实际上,默认ssh连接必须设置为localhost,因为该作业位于localhost上),并且能够看到正确的主机名 pwd

Thanks to @y2k-shubham for the SSHOperator, I am able to use it to set up a SSH connection successfully and am able to run some simple commands on the remote site (indeed the default ssh connection has to be set to localhost because the job is on the localhost) and am able to see the correct result of hostname, pwd.

但是,当我尝试运行实际作业,我收到相同的错误,同样,该错误来自jpipeline ob而不是Airflow dag / task。

However, when I attempted to run the actual job, I received same error, again, the error is from the jpipeline ob instead of the Airflow dag/task.

UPDATE2:20190303-2

UPDATE2: 20190303-2

我成功进行了一次运行(气流测试),没有错误,然后又进行了另一次失败的运行(调度程序),但管道中也出现了同样的错误。

I had a successful run (airflow test) with no error, and then followed another failed run (scheduler) with same error from pipeline.

推荐答案

虽然我建议您继续寻找一种优美的方式来实现目标您想要的是,我根据要求提供了示例用法

While I'd suggest you keep looking for a more graceful way of trying to achieve what you want, I'm putting up example usage as requested

首先,您已经创建 SSHHook 。可以通过两种方式完成此操作

First you've got to create an SSHHook. This can be done in two ways


  • 提供所有必需设置(如主机,用户,密码)的常规方式(如果需要)等来自客户端代码的实例化钩子。我在此引用 test_ssh_hook.py ,但是您必须彻底通过 SSHHook 及其测试来了解所有可能的用法

  • The conventional way where you supply all requisite settings like host, user, password (if needed) etc from the client code where you are instantiating the hook. Im hereby citing an example from test_ssh_hook.py, but you must thoroughly go through SSHHook as well as its tests to understand all possible usages


ssh_hook = SSHHook(remote_host="remote_host",
                   port="port",
                   username="username",
                   timeout=10,
                   key_file="fake.file")



  • Airflow 方式,将所有连接详细信息放在 连接 对象,该对象可以通过UI进行管理,并且只能通过 conn_id 实例化您的钩子

  • The Airflow way where you put all connection details inside a Connection object that can be managed from UI and only pass it's conn_id to instantiate your hook


    ssh_hook = SSHHook(ssh_conn_id="my_ssh_conn_id")
    


    当然,如果您依赖 SSHOperator ,则可以直接将 ssh_conn_id 传递给运算符。

    Of course, if your'e relying on SSHOperator, then you can directly pass the ssh_conn_id to operator.


    ssh_operator = SSHOperator(ssh_conn_id="my_ssh_conn_id")
    



  • 现在,如果您打算执行专用任务通过 SSH 的命令,您可以使用 SSHOperator 。我再次引用 test_ssh_operator.py ,但请仔细阅读源代码以获得更好的图片。

    Now if your'e planning to have a dedicated task for running a command over SSH, you can use SSHOperator. Again I'm citing an example from test_ssh_operator.py, but go through the sources for a better picture.


     task = SSHOperator(task_id="test",
                        command="echo -n airflow",
                        dag=self.dag,
                        timeout=10,
                        ssh_conn_id="ssh_default")
    







    但是随后,您可能希望通过SSH运行命令,这是更大任务的一部分。在这种情况下,您不希望使用 SSHOperator ,仍然可以只使用 SSHHook SSHHook 的> get_conn() 方法为您提供 paramiko SSHClient 。这样,您可以使用 exec_command() 调用


    But then you might want to run a command over SSH as a part of your bigger task. In that case, you don't want an SSHOperator, you can still use just the SSHHook. The get_conn() method of SSHHook provides you an instance of paramiko SSHClient. With this you can run a command using exec_command() call


    my_command = "echo airflow"
    stdin, stdout, stderr = ssh_client.exec_command(
      command=my_command,
      get_pty=my_command.startswith("sudo"),
      timeout=10)
    







    如果您查看 SSHOperator execute() 方法,它是一个相当复杂(但可靠)的方法代码试图实现一个非常简单的事情。对于我自己的用途,我创建了一些摘录,您可能想看看


    • 用于独立于 SSHOperator SSHHook $ c>,请查看 ssh_utils.py

    • 对于运行多个命令的操作员SSH(您可以使用 bash & 运算符),请参见 MultiCmdSSHOperator

    • For using SSHHook independently of SSHOperator, have a look at ssh_utils.py
    • For an operator that runs multiple commands over SSH (you can achieve the same thing by using bash's && operator), see MultiCmdSSHOperator

    这篇关于如何防止“执行失败:[Errno 32]管道损坏”?在气流中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆