在针对KubernetesPodOperator的DAG设置中我做错了什么 [英] What am I doing wrong in this DAG setup for KubernetesPodOperator
问题描述
我在
,在我尝试向其添加任何自定义之前...尝试按原样运行它.但是,代码似乎在我的气流环境中超时. and before I attempted to add anything custom to it ... attempted to run it as is. However, the code seems to timeout in my airflow environment. 每个文档此处尝试将 Per documentation here I attempted to set 任何输入将不胜感激. 由于此代码未使用完全合格的图像,因此意味着Airflow正在从和 Since this code isn’t using fully qualified images, that means Airflow is pulling the images from hub.docker.com, and "Python"命令也不应大写. Also the "Python" command shouldn’t be capitalised. 具有有效docker映像名称的工作代码为: A working code with valid docker image names would be: 这篇关于在针对KubernetesPodOperator的DAG设置中我做错了什么的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!startup_timeout_seconds
设置为10m之类的荒谬...但是仍然收到文档中描述的超时消息:startup_timeout_seconds
to something ridiculous like 10m ... but still got the the timeout message described in the documentation:[2019-01-04 11:13:33,360] {pod_launcher.py:112} INFO - Event: fail-7dd76b92 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 6, in <module>
exec(compile(open(__file__).read(), __file__, 'exec'))
File "/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/airflow/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
推荐答案
"ubuntu:1604"
对于 hub.docker.com ."Python:3.6"
and "ubuntu:1604"
aren’t available docker images names for Python or Ubuntu in hub.docker.com.from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))
start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',
image="python:3.6-stretch",
cmds=["python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="passing-test",
task_id="passing-task",
get_logs=True,
dag=dag
)
failing = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="fail",
task_id="failing-task",
get_logs=True,
dag=dag
)
passing.set_upstream(start)
failing.set_upstream(start)