如何在Airflow中运行Spark代码? [英] How to run Spark code in Airflow?

查看:925
本文介绍了如何在Airflow中运行Spark代码?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

地球人你好!
我正在使用Airflow计划和运行Spark任务。
我这次发现的所有内容都是Airflow可以管理的python DAG。

DAG示例:

Hello people of the Earth! I'm using Airflow to schedule and run Spark tasks. All I found by this time is python DAGs that Airflow can manage.
DAG example:

spark_count_lines.py
import logging

from airflow import DAG
from airflow.operators import PythonOperator

from datetime import datetime

args = {
  'owner': 'airflow'
  , 'start_date': datetime(2016, 4, 17)
  , 'provide_context': True
}

dag = DAG(
  'spark_count_lines'
  , start_date = datetime(2016, 4, 17)
  , schedule_interval = '@hourly'
  , default_args = args
)

def run_spark(**kwargs):
  import pyspark
  sc = pyspark.SparkContext()
  df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
  logging.info('Number of lines in people.txt = {0}'.format(df.count()))
  sc.stop()

t_main = PythonOperator(
  task_id = 'call_spark'
  , dag = dag
  , python_callable = run_spark
)

问题是我对Python的鳕鱼不好e并用Java编写一些任务。我的问题是如何在python DAG中运行Spark Java jar?或者,也许还有其他方法吗?我发现了火花提交: http://spark.apache.org/docs/latest/ Submitting-applications.html

但我不知道如何将所有内容连接在一起。也许有人以前使用过它并有可行的例子。谢谢您的时间!

The problem is I'm not good in Python code and have some tasks written in Java. My question is how to run Spark Java jar in python DAG? Or maybe there is other way yo do it? I found spark submit: http://spark.apache.org/docs/latest/submitting-applications.html
But I don't know how to connect everything together. Maybe someone used it before and has working example. Thank you for your time!

推荐答案

您应该可以使用 BashOperator 。保持其余代码不变,导入所需的类和系统软件包:

You should be able to use BashOperator. Keeping the rest of your code as is, import required class and system packages:

from airflow.operators.bash_operator import BashOperator

import os
import sys

设置所需路径:

os.environ['SPARK_HOME'] = '/path/to/spark/root'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))

添加运算符:

spark_task = BashOperator(
    task_id='spark_java',
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}',
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'},
    dag=dag
)

您可以轻松扩展此内容以提供其他使用Jinja模板的参数。

You can easily extend this to provide additional arguments using Jinja templates.

您当然可以通过将 bash_command 替换为适合您的情况的模板,针对非火花场景进行调整,例如:

You can of course adjust this for non-Spark scenario by replacing bash_command with a template suitable in your case, for example:

bash_command = 'java -jar {{ params.jar }}'

并调整 params

这篇关于如何在Airflow中运行Spark代码?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆