无法使用来自 Airflow DAG 的 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications [英] Unable to create SparkApplications on Kubernetes cluster using SparkKubernetesOperator from Airflow DAG
问题描述
Apache Airflow 版本:v2.1.1
Kubernetes 版本(如果您使用的是 kubernetes)(使用 kubectl 版本):-客户端版本:version.Info{Major:1",Minor:21",GitVersion:v1.21.2",GitCommit:092fbfbf53427de67cac1e9fa54aaa09a28371d7",Build2-06-16T12:52:14Z",GoVersion:go1.16.5",编译器:gc",平台:darwin/amd64"}服务器版本:version.Info{Major:1",Minor:19+",GitVersion:v1.19.8-eks-96780e",GitCommit:96780e1b30acbf0a52c38b6030d75",cClean"5状态树"3BuildDate:2021-03-10T21:32:29Z",GoVersion:go1.15.8",编译器:gc",平台:linux/amd64"}
环境:发展
云提供商或硬件配置:AWS EKS操作系统(例如来自/etc/os-release):内核(例如uname -a):安装工具:其他:发生了什么:我无法使用来自 Airflow DAG 的 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications.我在 EKS 上托管了 Airflow 和 Spark-operator.我已经在 Airflow 上创建了一个连接,通过使用集群配置"连接到 Kubernetes 集群.我只是运行示例应用程序,只是为了通过 Airflow 检查 Spark 在 Kubernetes 上的执行情况.
应用程序 YAML 文件:-
apiVersion:sparkoperator.k8s.io/v1beta2";种类:SparkApplication元数据:名称:spark-pi-airflow命名空间:spark-apps规格:类型:斯卡拉模式:集群图片:gcr.io/spark-operator/spark:v3.1.1";imagePullPolicy:始终主类:org.apache.spark.examples.SparkPimainApplicationFile:local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar";sparkVersion:3.1.1";重启策略:类型:从不卷:- 名称:测试卷"主机路径:路径:/tmp"类型:目录司机:核心数:1核心限制:1200m";内存:512m"标签:版本:3.1.1服务帐号:spark卷挂载:- 名称:测试卷"挂载路径:/tmp"执行人:核心数:1实例:1内存:512m"标签:版本:3.1.1卷挂载:- 名称:测试卷"挂载路径:/tmp"
气流 DAG:-
<预><代码>从日期时间导入时间增量# [开始导入模块]# DAG 对象;我们需要这个来实例化一个 DAG从气流导入 DAG# 运算符;我们需要这个来操作!从airflow.providers.cncf.kubernetes.operators.spark_kubernetes 导入SparkKubernetesOperator从airflow.providers.cncf.kubernetes.sensors.spark_kubernetes 导入SparkKubernetesSensor从airflow.utils.dates导入days_ago# [END import_module]# [START default_args]# 这些参数将传递给每个操作符# 您可以在操作员初始化期间在每个任务的基础上覆盖它们default_args = {'所有者':'气流','depends_on_past':错误,'电子邮件':['airflow@example.com'],'email_on_failure':错误,'email_on_retry':错误,'max_active_runs': 1,}# [END default_args]# [开始实例化_dag]DAG = DAG('spark_pi_airflow',default_args=default_args,description='提交 spark-pi 作为 kubernetes 上的 sparkApplication',schedule_interval=timedelta(days=1),start_date=days_ago(1),)t1 = SparkKubernetesOperator(task_id='spark_pi_submit',命名空间=火花应用",application_file=example_spark_kubernetes_spark_pi.yaml",kubernetes_conn_id="kubernetes_default",do_xcom_push=真,达格=达格,)t2 = SparkKubernetesSensor(task_id='spark_pi_monitor',命名空间=火花应用",application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",kubernetes_conn_id="kubernetes_default",达格=达格,)t1>>t2错误信息:-
<预><代码>[2021-07-12 10:18:46,629] {spark_kubernetes.py:67} 信息 - 创建 sparkApplication[2021-07-12 10:18:46,662] {taskinstance.py:1501} 错误 - 任务失败,出现异常回溯(最近一次调用最后一次):文件/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",第174行,在create_custom_object响应 = api.create_namespaced_custom_object(文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py",第 183 行,在 create_namespaced_custom_object(数据)= self.create_namespaced_custom_object_with_http_info(组,版本,命名空间,复数,正文,**kwargs)#noqa:E501文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py",第 275 行,在 create_namespaced_custom_object_with_http_info返回 self.api_client.call_api(文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",第 340 行,在 call_api 中返回 self.__call_api(resource_path, method,文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",第 172 行,在 __call_api 中response_data = self.request(文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",第 382 行,请求返回 self.rest_client.POST(url,POST 中的文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py",第 272 行返回 self.request(POST", url,文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py",第 231 行,请求引发 ApiException(http_resp=r)kubernetes.client.rest.ApiException: (403)理由:禁止HTTP 响应头: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})HTTP 响应主体:{kind":Status"、apiVersion":v1"、metadata":{}、status":Failure"、message":"sparkapplications.sparkoperator.k8s.io 被禁止:用户 \system:serviceaccount:airflow:airflow-cluster\"无法创建资源sparkapplications";在 API 组sparkoperator.k8s.io"中;在命名空间spark-apps"中,reason":Forbidden",details":{group":sparkoperator.k8s.io",kind":sparkapplications"},代码":403}在处理上述异常的过程中,又发生了一个异常:回溯(最近一次调用最后一次):文件/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",第 1157 行,在 _run_raw_taskself._prepare_and_execute_task_with_callbacks(上下文,任务)文件/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",第 1331 行,在 _prepare_and_execute_task_with_callbacks结果 = self._execute_task(上下文,task_copy)文件/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",第 1361 行,在 _execute_task结果 = task_copy.execute(context=context)文件/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",第69行,执行响应 = hook.create_custom_object(文件/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",第180行,在create_custom_object引发气流异常(f调用时出现异常 -> create_custom_object:{e}\n")airflow.exceptions.AirflowException: 调用时出现异常 ->create_custom_object: (403)理由:禁止HTTP 响应头: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})HTTP 响应主体:{kind":Status"、apiVersion":v1"、metadata":{}、status":Failure"、message":"sparkapplications.sparkoperator.k8s.io 被禁止:用户\system:serviceaccount:***:***-cluster\"无法创建资源sparkapplications";在 API 组sparkoperator.k8s.io"中;在命名空间spark-apps"中,reason":Forbidden",details":{group":sparkoperator.k8s.io",kind":sparkapplications"},代码":403}你期望发生的事情:Kubernetes Airflow 应使用 SparkKubernetesOperator 安排和运行 Spark 作业.
如何重现它:在 Kubernetes 集群上使用 helm 部署 Spark operator.在 Kubernetes 集群上使用 helm 部署 Airflow.部署上述应用程序和 Airflow DAG.
其他我们需要知道的:-
我已经创建了服务帐号:-
$ kubectl create serviceaccount spark
给服务帐户在集群上的编辑角色:-
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=airflow:airflow-cluster --namespace=spark-apps
这里是 kube 集群角色资源.使用 kubectl -n
# spark-on-k8s-operator 在集群上创建资源的角色api版本:rbac.authorization.k8s.io/v1种类:集群角色元数据:名称:spark-cluster-cr标签:rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit:真";规则:- apiGroups:- sparkoperator.k8s.io资源:- 火花应用动词:- '*'---# 允许airflow-worker 服务帐户访问spark-on-k8sapi版本:rbac.authorization.k8s.io/v1种类:ClusterRoleBinding元数据:名称:airflow-spark-crb角色参考:apiGroup: rbac.authorization.k8s.io种类:集群角色名称:spark-cluster-cr科目:- 种类:ServiceAccount名称:气流集群命名空间:气流
注意事项:
- 以上假设错误信息
sparkapplications.sparkoperator.k8s.io 被禁止:用户system:serviceaccount:airflow:airflow-cluster"无法创建资源sparkapplications";在 API 组sparkoperator.k8s.io"中;在命名空间 spark-apps
- Airflow 命名空间:
airflow
- Airflow serviceaccount:
airflow-cluster
- Spark 作业命名空间:
spark-apps
- Airflow 命名空间:
- 您还应该有 安装了>spark-on-k8s-operator
- 使用 helm
--set webhook.enable=true
如果你想在你的spec.driver
中使用
env
- 使用 helm
Apache Airflow version: v2.1.1
Kubernetes version (if you are using kubernetes) (use kubectl version):- Client Version: version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.2", GitCommit:"092fbfbf53427de67cac1e9fa54aaa09a28371d7", GitTreeState:"clean", BuildDate:"2021-06-16T12:52:14Z", GoVersion:"go1.16.5", Compiler:"gc", Platform:"darwin/amd64"} Server Version: version.Info{Major:"1", Minor:"19+", GitVersion:"v1.19.8-eks-96780e", GitCommit:"96780e1b30acbf0a52c38b6030d7853e575bcdf3", GitTreeState:"clean", BuildDate:"2021-03-10T21:32:29Z", GoVersion:"go1.15.8", Compiler:"gc", Platform:"linux/amd64"}
Environment: Development
Cloud provider or hardware configuration: AWS EKS OS (e.g. from /etc/os-release): Kernel (e.g. uname -a): Install tools: Others: What happened: I am not able to create SparkApplications on the Kubernetes cluster using SparkKubernetesOperator from Airflow DAG. I have hosted Airflow and Spark-operator on EKS. I have created a connection on Airflow to connect to the Kubernetes cluster by using "in cluster configuration". I am just running the sample application just to check the execution of spark on Kubernetes through Airflow.
Application YAML file:-
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi-airflow
namespace: spark-apps
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
Airflow DAG:-
from datetime import timedelta
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'max_active_runs': 1,
}
# [END default_args]
# [START instantiate_dag]
dag = DAG(
'spark_pi_airflow',
default_args=default_args,
description='submit spark-pi as sparkApplication on kubernetes',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
)
t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace="spark-apps",
application_file="example_spark_kubernetes_spark_pi.yaml",
kubernetes_conn_id="kubernetes_default",
do_xcom_push=True,
dag=dag,
)
t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace="spark-apps",
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
kubernetes_conn_id="kubernetes_default",
dag=dag,
)
t1 >> t2
Error Message:-
[2021-07-12 10:18:46,629] {spark_kubernetes.py:67} INFO - Creating sparkApplication
[2021-07-12 10:18:46,662] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 174, in create_custom_object
response = api.create_namespaced_custom_object(
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 183, in create_namespaced_custom_object
(data) = self.create_namespaced_custom_object_with_http_info(group, version, namespace, plural, body, **kwargs) # noqa: E501
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 275, in create_namespaced_custom_object_with_http_info
return self.api_client.call_api(
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api
return self.__call_api(resource_path, method,
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api
response_data = self.request(
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request
return self.rest_client.POST(url,
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST
return self.request("POST", url,
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request
raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 69, in execute
response = hook.create_custom_object(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 180, in create_custom_object
raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")
airflow.exceptions.AirflowException: Exception when calling -> create_custom_object: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:***:***-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}
What you expected to happen: Kubernetes Airflow should schedule and run spark job using SparkKubernetesOperator.
How to reproduce it: Deploy Spark operator using helm on Kubernetes cluster. Deploy Airflow using helm on Kubernetes cluster. Deploy the above-mentioned application and Airflow DAG.
Anything else we need to know:-
I have already created service account:-
$ kubectl create serviceaccount spark
Given the service account the edit role on the cluster:-
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=airflow:airflow-cluster --namespace=spark-apps
Here are kube cluster role resources. Create with kubectl -n <namespace> apply -f <filename.yaml>
# Role for spark-on-k8s-operator to create resources on cluster
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: spark-cluster-cr
labels:
rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rules:
- apiGroups:
- sparkoperator.k8s.io
resources:
- sparkapplications
verbs:
- '*'
---
# Allow airflow-worker service account access for spark-on-k8s
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: airflow-spark-crb
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: spark-cluster-cr
subjects:
- kind: ServiceAccount
name: airflow-cluster
namespace: airflow
Notes:
- The above is assuming the error message
sparkapplications.sparkoperator.k8s.io is forbidden: User "system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace spark-apps
- Airflow namespace:
airflow
- Airflow serviceaccount:
airflow-cluster
- Spark jobs namespace:
spark-apps
- Airflow namespace:
- You should also have spark-on-k8s-operator installed
- With helm
--set webhook.enable=true
if you want to useenv
in yourspec.driver
- With helm
这篇关于无法使用来自 Airflow DAG 的 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!