Spark 2.4.0依赖关系以写入AWS Redshift [英] Spark 2.4.0 dependencies to write to AWS Redshift

查看:193
本文介绍了Spark 2.4.0依赖关系以写入AWS Redshift的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在努力寻找正确的软件包依赖关系及其相对版本,以使用Pyspark微批处理方法写入Redshfit DB.

I'm struggling to find the correct packages dependency and their relative version to write to a Redshfit DB with a Pyspark micro-batch approach.

实现此目标的正确依赖关系是什么?

What are the correct dependencies to achieve this goal?

推荐答案

根据

As suggested from AWS tutorial is necessary to provide a JDBC driver

wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar

下载此jar并将其提供给spark-submit命令后,这就是我向其提供依赖项的方式:

After this jar has been downloaded and make it available to the spark-submit command, this is how I provided dependencies to it:

spark-submit --master yarn --deploy-mode cluster \
  --jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar \
  --packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 \
  my_script.py

最后这是我提供给spark-submit

from pyspark.sql import SparkSession

def foreach_batch_function(df, epoch_id):
    df.write\
        .format("com.databricks.spark.redshift") \
        .option("aws_iam_role", my_role) \
        .option("url", my_redshift_url) \
        .option("user", my_redshift_user) \
        .option("password", my_redshift_password) \
        .option("dbtable", my_redshift_schema + "." + my_redshift_table)\
        .option("tempdir", "s3://my/temp/dir") \
        .mode("append")\
        .save()

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", my_aws_access_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", my_aws_secret_access_key)

my_schema = spark.read.parquet(my_schema_file_path).schema

df = spark \
    .readStream \
    .schema(my_schema) \
    .option("maxFilesPerTrigger", 100) \
    .parquet(my_source_path)

df.writeStream \
    .trigger(processingTime='30 seconds') \
    .foreachBatch(foreach_batch_function)\
    .option("checkpointLocation", my_checkpoint_location) \
    .start(outputMode="update").awaitTermination()

这篇关于Spark 2.4.0依赖关系以写入AWS Redshift的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆