无法使用来自 Airflow DAG 的 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications [英] Unable to create SparkApplications on Kubernetes cluster using SparkKubernetesOperator from Airflow DAG

查看:21
本文介绍了无法使用来自 Airflow DAG 的 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Apache Airflow 版本:v2.1.1

Kubernetes 版本(如果您使用的是 kubernetes)(使用 kubectl 版本):-客户端版本:version.Info{Major:1",Minor:21",GitVersion:v1.21.2",GitCommit:092fbfbf53427de67cac1e9fa54aaa09a28371d7",Build2-06-16T12:52:14Z",GoVersion:go1.16.5",编译器:gc",平台:darwin/amd64"}服务器版本:version.Info{Major:1",Minor:19+",GitVersion:v1.19.8-eks-96780e",GitCommit:96780e1b30acbf0a52c38b6030d75",cClean"5状态树"3BuildDate:2021-03-10T21:32:29Z",GoVersion:go1.15.8",编译器:gc",平台:linux/amd64"}

环境:发展

云提供商或硬件配置:AWS EKS操作系统(例如来自/etc/os-release):内核(例如uname -a):安装工具:其他:发生了什么:我无法使用来自 Airflow DAG 的 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications.我在 EKS 上托管了 Airflow 和 Spark-operator.我已经在 Airflow 上创建了一个连接,通过使用集群配置"连接到 Kubernetes 集群.我只是运行示例应用程序,只是为了通过 Airflow 检查 Spark 在 Kubernetes 上的执行情况.

应用程序 YAML 文件:-

apiVersion:sparkoperator.k8s.io/v1beta2";种类:SparkApplication元数据:名称:spark-pi-airflow命名空间:spark-apps规格:类型:斯卡拉模式:集群图片:gcr.io/spark-operator/spark:v3.1.1";imagePullPolicy:始终主类:org.apache.spark.examples.SparkPimainApplicationFile:local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar";sparkVersion:3.1.1";重启策略:类型:从不卷:- 名称:测试卷"主机路径:路径:/tmp"类型:目录司机:核心数:1核心限制:1200m";内存:512m"标签:版本:3.1.1服务帐号:spark卷挂载:- 名称:测试卷"挂载路径:/tmp"执行人:核心数:1实例:1内存:512m"标签:版本:3.1.1卷挂载:- 名称:测试卷"挂载路径:/tmp"

气流 DAG:-

<预><代码>从日期时间导入时间增量# [开始导入模块]# DAG 对象;我们需要这个来实例化一个 DAG从气流导入 DAG# 运算符;我们需要这个来操作!从airflow.providers.cncf.kubernetes.operators.spark_kubernetes 导入SparkKubernetesOperator从airflow.providers.cncf.kubernetes.sensors.spark_kubernetes 导入SparkKubernetesSensor从airflow.utils.dates导入days_ago# [END import_module]# [START default_args]# 这些参数将传递给每个操作符# 您可以在操作员初始化期间在每个任务的基础上覆盖它们default_args = {'所有者':'气流','depends_on_past':错误,'电子邮件':['airflow@example.com'],'email_on_failure':错误,'email_on_retry':错误,'max_active_runs': 1,}# [END default_args]# [开始实例化_dag]DAG = DAG('spark_pi_airflow',default_args=default_args,description='提交 spark-pi 作为 kubernetes 上的 sparkApplication',schedule_interval=timedelta(days=1),start_date=days_ago(1),)t1 = SparkKubernetesOperator(task_id='spark_pi_submit',命名空间=火花应用",application_file=example_spark_kubernetes_spark_pi.yaml",kubernetes_conn_id="kubernetes_default",do_xcom_push=真,达格=达格,)t2 = SparkKubernetesSensor(task_id='spark_pi_monitor',命名空间=火花应用",application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",kubernetes_conn_id="kubernetes_default",达格=达格,)t1>>t2

错误信息:-

<预><代码>[2021-07-12 10:18:46,629] {spark_kubernetes.py:67} 信息 - 创建 sparkApplication[2021-07-12 10:18:46,662] {taskinstance.py:1501} 错误 - 任务失败,出现异常回溯(最近一次调用最后一次):文件/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",第174行,在create_custom_object响应 = api.create_namespaced_custom_object(文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py",第 183 行,在 create_namespaced_custom_object(数据)= self.create_namespaced_custom_object_with_http_info(组,版本,命名空间,复数,正文,**kwargs)#noqa:E501文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py",第 275 行,在 create_namespaced_custom_object_with_http_info返回 self.api_client.call_api(文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",第 340 行,在 call_api 中返回 self.__call_api(resource_path, method,文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",第 172 行,在 __call_api 中response_data = self.request(文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py",第 382 行,请求返回 self.rest_client.POST(url,POST 中的文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py",第 272 行返回 self.request(POST", url,文件/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py",第 231 行,请求引发 ApiException(http_resp=r)kubernetes.client.rest.ApiException: (403)理由:禁止HTTP 响应头: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})HTTP 响应主体:{kind":Status"、apiVersion":v1"、metadata":{}、status":Failure"、message":"sparkapplications.sparkoperator.k8s.io 被禁止:用户 \system:serviceaccount:airflow:airflow-cluster\"无法创建资源sparkapplications";在 API 组sparkoperator.k8s.io"中;在命名空间spark-apps"中,reason":Forbidden",details":{group":sparkoperator.k8s.io",kind":sparkapplications"},代码":403}在处理上述异常的过程中,又发生了一个异常:回溯(最近一次调用最后一次):文件/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",第 1157 行,在 _run_raw_taskself._prepare_and_execute_task_with_callbacks(上下文,任务)文件/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",第 1331 行,在 _prepare_and_execute_task_with_callbacks结果 = self._execute_task(上下文,task_copy)文件/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",第 1361 行,在 _execute_task结果 = task_copy.execute(context=context)文件/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py",第69行,执行响应 = hook.create_custom_object(文件/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",第180行,在create_custom_object引发气流异常(f调用时出现异常 -> create_custom_object:{e}\n")airflow.exceptions.AirflowException: 调用时出现异常 ->create_custom_object: (403)理由:禁止HTTP 响应头: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})HTTP 响应主体:{kind":Status"、apiVersion":v1"、metadata":{}、status":Failure"、message":"sparkapplications.sparkoperator.k8s.io 被禁止:用户\system:serviceaccount:***:***-cluster\"无法创建资源sparkapplications";在 API 组sparkoperator.k8s.io"中;在命名空间spark-apps"中,reason":Forbidden",details":{group":sparkoperator.k8s.io",kind":sparkapplications"},代码":403}

你期望发生的事情:Kubernetes Airflow 应使用 SparkKubernetesOperator 安排和运行 Spark 作业.

如何重现它:在 Kubernetes 集群上使用 helm 部署 Spark operator.在 Kubernetes 集群上使用 helm 部署 Airflow.部署上述应用程序和 Airflow DAG.

其他我们需要知道的:-

我已经创建了服务帐号:-

$ kubectl create serviceaccount spark

给服务帐户在集群上的编辑角色:-

$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=airflow:airflow-cluster --namespace=spark-apps

解决方案

这里是 kube 集群角色资源.使用 kubectl -n 创建apply -f

# spark-on-k8s-operator 在集群上创建资源的角色api版本:rbac.authorization.k8s.io/v1种类:集群角色元数据:名称:spark-cluster-cr标签:rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit:真";规则:- apiGroups:- sparkoperator.k8s.io资源:- 火花应用动词:- '*'---# 允许airflow-worker 服务帐户访问spark-on-k8sapi版本:rbac.authorization.k8s.io/v1种类:ClusterRoleBinding元数据:名称:airflow-spark-crb角色参考:apiGroup: rbac.authorization.k8s.io种类:集群角色名称:spark-cluster-cr科目:- 种类:ServiceAccount名称:气流集群命名空间:气流

注意事项:

  • 以上假设错误信息

    sparkapplications.sparkoperator.k8s.io 被禁止:用户system:serviceaccount:airflow:airflow-cluster"无法创建资源sparkapplications";在 API 组sparkoperator.k8s.io"中;在命名空间 spark-apps

    • Airflow 命名空间:airflow
    • Airflow serviceaccount:airflow-cluster
    • Spark 作业命名空间:spark-apps
  • 您还应该有 安装了>spark-on-k8s-operator
    • 使用 helm --set webhook.enable=true 如果你想在你的 spec.driver
    • 中使用 env

Apache Airflow version: v2.1.1

Kubernetes version (if you are using kubernetes) (use kubectl version):- Client Version: version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.2", GitCommit:"092fbfbf53427de67cac1e9fa54aaa09a28371d7", GitTreeState:"clean", BuildDate:"2021-06-16T12:52:14Z", GoVersion:"go1.16.5", Compiler:"gc", Platform:"darwin/amd64"} Server Version: version.Info{Major:"1", Minor:"19+", GitVersion:"v1.19.8-eks-96780e", GitCommit:"96780e1b30acbf0a52c38b6030d7853e575bcdf3", GitTreeState:"clean", BuildDate:"2021-03-10T21:32:29Z", GoVersion:"go1.15.8", Compiler:"gc", Platform:"linux/amd64"}

Environment: Development

Cloud provider or hardware configuration: AWS EKS OS (e.g. from /etc/os-release): Kernel (e.g. uname -a): Install tools: Others: What happened: I am not able to create SparkApplications on the Kubernetes cluster using SparkKubernetesOperator from Airflow DAG. I have hosted Airflow and Spark-operator on EKS. I have created a connection on Airflow to connect to the Kubernetes cluster by using "in cluster configuration". I am just running the sample application just to check the execution of spark on Kubernetes through Airflow.

Application YAML file:-

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi-airflow
  namespace: spark-apps
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v3.1.1"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
  sparkVersion: "3.1.1"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.1
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"

Airflow DAG:-


from datetime import timedelta

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'max_active_runs': 1,
}
# [END default_args]

# [START instantiate_dag]

dag = DAG(
    'spark_pi_airflow',
    default_args=default_args,
    description='submit spark-pi as sparkApplication on kubernetes',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(1),
)

t1 = SparkKubernetesOperator(
    task_id='spark_pi_submit',
    namespace="spark-apps",
    application_file="example_spark_kubernetes_spark_pi.yaml",
    kubernetes_conn_id="kubernetes_default",
    do_xcom_push=True,
    dag=dag,
)

t2 = SparkKubernetesSensor(
    task_id='spark_pi_monitor',
    namespace="spark-apps",
    application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
    kubernetes_conn_id="kubernetes_default",
    dag=dag,
)
t1 >> t2

Error Message:-


[2021-07-12 10:18:46,629] {spark_kubernetes.py:67} INFO - Creating sparkApplication
[2021-07-12 10:18:46,662] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 174, in create_custom_object
    response = api.create_namespaced_custom_object(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 183, in create_namespaced_custom_object
    (data) = self.create_namespaced_custom_object_with_http_info(group, version, namespace, plural, body, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 275, in create_namespaced_custom_object_with_http_info
    return self.api_client.call_api(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api
    return self.__call_api(resource_path, method,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api
    response_data = self.request(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request
    return self.rest_client.POST(url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST
    return self.request("POST", url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request
    raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}



During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 69, in execute
    response = hook.create_custom_object(
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 180, in create_custom_object
    raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")
airflow.exceptions.AirflowException: Exception when calling -> create_custom_object: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:***:***-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}

What you expected to happen: Kubernetes Airflow should schedule and run spark job using SparkKubernetesOperator.

How to reproduce it: Deploy Spark operator using helm on Kubernetes cluster. Deploy Airflow using helm on Kubernetes cluster. Deploy the above-mentioned application and Airflow DAG.

Anything else we need to know:-

I have already created service account:-

$ kubectl create serviceaccount spark

Given the service account the edit role on the cluster:-

$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=airflow:airflow-cluster --namespace=spark-apps

解决方案

Here are kube cluster role resources. Create with kubectl -n <namespace> apply -f <filename.yaml>

# Role for spark-on-k8s-operator to create resources on cluster
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: spark-cluster-cr
  labels:
    rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rules:
  - apiGroups:
      - sparkoperator.k8s.io
    resources:
      - sparkapplications
    verbs:
      - '*'
---
# Allow airflow-worker service account access for spark-on-k8s
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: airflow-spark-crb
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: spark-cluster-cr
subjects:
  - kind: ServiceAccount
    name: airflow-cluster
    namespace: airflow

Notes:

  • The above is assuming the error message

    sparkapplications.sparkoperator.k8s.io is forbidden: User "system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace spark-apps
    

    • Airflow namespace: airflow
    • Airflow serviceaccount: airflow-cluster
    • Spark jobs namespace: spark-apps
  • You should also have spark-on-k8s-operator installed
    • With helm --set webhook.enable=true if you want to use env in your spec.driver

这篇关于无法使用来自 Airflow DAG 的 SparkKubernetesOperator 在 Kubernetes 集群上创建 SparkApplications的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆