如何将气流工人的体积安装到气流kubernetes吊舱操作员上? [英] How to mount volume of airflow worker to airflow kubernetes pod operator?
本文介绍了如何将气流工人的体积安装到气流kubernetes吊舱操作员上?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试在气流中使用kubernetes pod运算符,并且我想与气流工作人员上的kubernetes pod共享一个目录,有没有办法将气流工作人员的目录挂载到kubernetes pod? >
我尝试使用下面的代码,但似乎未成功装载该卷.
import datetime
import unittest
from unittest import TestCase
from airflow.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
class TestMailAlarm(TestCase):
def setUp(self):
self.namespace = "test-namespace"
self.image = "ubuntu:16.04"
self.name = "default"
self.cluster_context = "default"
self.dag_id = "test_dag"
self.task_id = "root_test_dag"
self.execution_date = datetime.datetime.now()
self.context = {"dag_id": self.dag_id,
"task_id": self.task_id,
"execution_date": self.execution_date}
self.cmds = ["sleep"]
self.arguments = ["100"]
self.volume_mount = VolumeMount('test',
mount_path='/tmp',
sub_path=None,
read_only=False)
volume_config = {
'persistentVolumeClaim':
{
'claimName': 'test'
}
}
self.volume = Volume(name='test', configs=volume_config)
self.operator = KubernetesPodOperator(
namespace=self.namespace, image=self.image, name=self.name,
cmds=self.cmds,
arguments=self.arguments,
startup_timeout_seconds=600,
is_delete_operator_pod=True,
# the operator could run successfully but the directory /tmp is not mounted to kubernetes operator
volume=[self.volume],
volume_mount=[self.volume_mount],
**self.context)
def test_execute(self):
self.operator.execute(self.context)
解决方案
文档中的示例与您的代码非常相似,只是参数为复数 volume_mounts
和
self.operator = KubernetesPodOperator(
namespace=self.namespace, image=self.image, name=self.name,
cmds=self.cmds,
arguments=self.arguments,
startup_timeout_seconds=600,
is_delete_operator_pod=True,
# the operator could run successfully but the directory /tmp is not mounted to kubernetes operator
volumes=[self.volume],
volume_mounts=[self.volume_mount],
**self.context)
I am trying to using the kubernetes pod operator in airflow, and there is a directory that I wish to share with kubernetes pod on my airflow worker, is there is a way to mount airflow worker's directory to kubernetes pod?
I tried with the code below, and the volumn seems not mounted successfully.
import datetime
import unittest
from unittest import TestCase
from airflow.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
class TestMailAlarm(TestCase):
def setUp(self):
self.namespace = "test-namespace"
self.image = "ubuntu:16.04"
self.name = "default"
self.cluster_context = "default"
self.dag_id = "test_dag"
self.task_id = "root_test_dag"
self.execution_date = datetime.datetime.now()
self.context = {"dag_id": self.dag_id,
"task_id": self.task_id,
"execution_date": self.execution_date}
self.cmds = ["sleep"]
self.arguments = ["100"]
self.volume_mount = VolumeMount('test',
mount_path='/tmp',
sub_path=None,
read_only=False)
volume_config = {
'persistentVolumeClaim':
{
'claimName': 'test'
}
}
self.volume = Volume(name='test', configs=volume_config)
self.operator = KubernetesPodOperator(
namespace=self.namespace, image=self.image, name=self.name,
cmds=self.cmds,
arguments=self.arguments,
startup_timeout_seconds=600,
is_delete_operator_pod=True,
# the operator could run successfully but the directory /tmp is not mounted to kubernetes operator
volume=[self.volume],
volume_mount=[self.volume_mount],
**self.context)
def test_execute(self):
self.operator.execute(self.context)
解决方案
The example in the docs seems pretty similar to your code, only the parameters are plurals volume_mounts
and volumes
. For your code it would look like this:
self.operator = KubernetesPodOperator(
namespace=self.namespace, image=self.image, name=self.name,
cmds=self.cmds,
arguments=self.arguments,
startup_timeout_seconds=600,
is_delete_operator_pod=True,
# the operator could run successfully but the directory /tmp is not mounted to kubernetes operator
volumes=[self.volume],
volume_mounts=[self.volume_mount],
**self.context)
这篇关于如何将气流工人的体积安装到气流kubernetes吊舱操作员上?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文