Spark Shell使用spark-defaults.conf将多个驱动程序/容器添加到Classpath [英] Spark Shell Add Multiple Drivers/Jars to Classpath using spark-defaults.conf

查看:148
本文介绍了Spark Shell使用spark-defaults.conf将多个驱动程序/容器添加到Classpath的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用Spark-Shell REPL模式来测试各种用例并连接到多个源/接收器

We are using Spark-Shell REPL Mode to test various use-cases and connecting to multiple sources/sinks

我们需要在spark-defaults.conf文件中添加自定义驱动程序/罐子,我试图添加多个以逗号分隔的罐子

We need to add custom drivers/jars in spark-defaults.conf file, I have tried to add multiple jars separated by comma

喜欢

spark.driver.extraClassPath = /home/sandeep/mysql-connector-java-5.1.36.jar 
spark.executor.extraClassPath = /home/sandeep/mysql-connector-java-5.1.36.jar

但是它不起作用,任何人都可以提供正确语法的详细信息

But its not working, Can anyone please provide details for correct syntax

推荐答案

注意:在Linux Mint和Spark 3.0.1中进行了验证

如果要在spark-defaults.conf中设置属性,则只有在使用spark-submit提交作业时,spark才会采用这些设置.

If you are setting properties in spark-defaults.conf, spark will take those settings only when you submit your job using spark-submit.

注意:spark-shell和pyspark需要验证.

Note: spark-shell and pyspark need to verify.

文件: spark-defaults.conf

spark.driver.extraJavaOptions      -Dlog4j.configuration=file:log4j.properties -Dspark.yarn.app.container.log.dir=app-logs -Dlogfile.name=hello-spark
spark.jars.packages                 org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-avro_2.12:3.0.1

在终端运行您的工作,说出wordcount.py

In the terminal run your job say wordcount.py

spark-submit /path-to-file/wordcount.py

如果要通过IDE在开发模式下运行作业,则应使用config()方法.在这里,我们将设置Kafka jar包和avro包.另外,如果要包括log4j.properties,请使用extraJavaOptions.

If you want to run your job in development mode from an IDE then you should use config() method. Here we will set Kafka jar packages and avro package. Also if you want to include log4j.properties, then use extraJavaOptions.

AppName和master可以通过两种方式提供.

AppName and master can be provided in 2 way.

  1. 使用.appName()和.master()
  2. 使用.conf文件

文件: hellospark.py

from logger import Log4j
from util import get_spark_app_config
from pyspark.sql import SparkSession

# first approach.
spark = SparkSession.builder \
    .appName('Hello Spark') \
    .master('local[3]') \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.jars.packages", 
                 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,
                  org.apache.spark:spark-avro_2.12:3.0.1") \
    .config("spark.driver.extraJavaOptions", 
                 "-Dlog4j.configuration=file:log4j.properties "
                 "-Dspark.yarn.app.container.log.dir=app-logs "
                 "-Dlogfile.name=hello-spark") \
    .getOrCreate()

# second approach.
conf = get_spark_app_config()
spark = SparkSession.builder \
    .config(conf=conf)
    .config("spark.jars.packages", 
                 "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1") \
    .getOrCreate()

logger = Log4j(spark)

文件: logger.py

from pyspark.sql import SparkSession


class Log4j(object):
    def __init__(self, spark: SparkSession):
        conf = spark.sparkContext.getConf()
        app_name = conf.get("spark.app.name")
        log4j = spark._jvm.org.apache.log4j
        self.logger = log4j.LogManager.getLogger(app_name)

    def warn(self, message):
        self.logger.warn(message)

    def info(self, message):
        self.logger.info(message)

    def error(self, message):
        self.logger.error(message)

    def debug(self, message):
        self.logger.debug(message)

文件: util.py

import configparser
from pyspark import SparkConf

def get_spark_app_config(enable_delta_lake=False):
    """
    It will read configuration from spark.conf file to create
    an instance of SparkConf(). Can be used to create
    SparkSession.builder.config(conf=conf).getOrCreate()
    :return: instance of SparkConf()
    """
    spark_conf = SparkConf()
    config = configparser.ConfigParser()
    config.read("spark.conf")

    for (key, value) in config.items("SPARK_APP_CONFIGS"):
        spark_conf.set(key, value))

    if enable_delta_lake:
        for (key, value) in config.items("DELTA_LAKE_CONFIGS"):
            spark_conf.set(key, value)
    return spark_conf

文件: spark.conf

[SPARK_APP_CONFIGS]
spark.app.name = Hello Spark
spark.master = local[3]
spark.sql.shuffle.partitions = 3

[DELTA_LAKE_CONFIGS]
spark.jars.packages = io.delta:delta-core_2.12:0.7.0
spark.sql.extensions = io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog = org.apache.spark.sql.delta.catalog.DeltaCatalog

这篇关于Spark Shell使用spark-defaults.conf将多个驱动程序/容器添加到Classpath的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆