在Airflow上使用DataProOperator的组件网关 [英] Component Gateway with DataprocOperator on Airflow
本文介绍了在Airflow上使用DataProOperator的组件网关的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
在GCP中,从UI或gCloud命令安装和运行JupyterHub component相当简单。我试图通过气流和DataprocClusterCreateOperator编写这个过程的脚本,这里是DAG的摘录
from airflow.contrib.operators import dataproc_operator
create_cluster=dataproc_operator.DataprocClusterCreateOperator(
task_id='create-' + CLUSTER_NAME,
cluster_name=CLUSTER_NAME,
project_id=PROJECT_ID,
num_workers=3,
num_masters=1,
master_machine_type='n1-standard-2',
worker_machine_type='n1-standard-2',
master_disk_size=100,
worker_disk_size=100,
storage_bucket='test-dataproc-jupyter',
region='europe-west4',
zone='europe-west4-a',
auto_delete_ttl=21600,
optional_components=['JUPYTER', 'ANACONDA']
)
但是,我无法指定所需的enable-component-gateway
参数。看一下源代码,这些参数似乎不是预期的(无论是在deprecated还是last stable运算符中)。
我知道rest API提供endpointConfig.enableHttpPortAccess
,但我更愿意使用官方运算符。
有谁知道如何做到这一点吗?
推荐答案
编辑,适用于Composer-1.8.3和Airflow-1.10.3的修复
在Airflow 1.10.3中,集群配置不能从外部创建。但是,我们可以继承集群创建操作符并覆盖配置创建。这还允许我们设置可选组件,这是此气流版本中缺少的参数。
class CustomDataprocClusterCreateOperator(DataprocClusterCreateOperator):
def __init__(self, *args, **kwargs):
super(CustomDataprocClusterCreateOperator, self).__init__(*args, **kwargs)
def _build_cluster_data(self):
cluster_data = super(CustomDataprocClusterCreateOperator, self)._build_cluster_data()
cluster_data['config']['endpointConfig'] = {
'enableHttpPortAccess': True
}
cluster_data['config']['softwareConfig']['optionalComponents'] = [ 'JUPYTER', 'ANACONDA' ]
return cluster_data
#Start DataProc Cluster
dataproc_cluster = CustomDataprocClusterCreateOperator(
task_id='create-' + CLUSTER_NAME,
cluster_name=CLUSTER_NAME,
project_id=PROJECT_ID,
num_workers=3,
num_masters=1,
master_machine_type='n1-standard-2',
worker_machine_type='n1-standard-2',
master_disk_size=100,
worker_disk_size=100,
storage_bucket='test-dataproc-jupyter',
region='europe-west4',
zone='europe-west4-a',
auto_delete_ttl=21600,
dag=dag
)
原始答案,适用于气流1.10.7
虽然不是最佳的,但您可以自己创建集群数据结构,而不是让Airflow的ClusterGenerator来做这件事。它应该可以在最新版本(1.10.7)上运行cluster = {
'clusterName': CLUSTER_NAME,
'config': {
'gceClusterConfig': {
'zoneUri': 'europe-west4-a'
},
'masterConfig': {
'numInstances': 1,
'machineTypeUri': 'n1-standard-2',
'diskConfig': {
'bootDiskSizeGb': 100
},
},
'workerConfig': {
'numInstances': 3,
'machineTypeUri': 'n1-standard-2',
'diskConfig': {
'bootDiskSizeGb': 100
},
},
'softwareConfig': {
'optionalComponents': [
'ANACONDA',
'JUPYTER'
]
},
'lifestyleConfig': {
'autoDeleteTtl': 21600
},
'endpointConfig': {
'enableHttpPortAccess': True
}
},
'projectId': PROJECT_ID
}
#Start DataProc Cluster
dataproc_cluster = DataprocClusterCreateOperator(
task_id='create-' + CLUSTER_NAME,
project_id=PROJECT_ID,
num_workers=3,
region='europe-west4',
zone='europe-west4-a',
cluster = cluster,
dag=DAG
)
如果您使用的是其他气流版本,请指定该版本。
您也可以投票支持我打开的错误:AIRFLOW-6432
这篇关于在Airflow上使用DataProOperator的组件网关的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文