Spark 2.4.0依赖关系以写入AWS Redshift [英] Spark 2.4.0 dependencies to write to AWS Redshift
本文介绍了Spark 2.4.0依赖关系以写入AWS Redshift的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在努力寻找正确的软件包依赖关系及其相对版本,以使用Pyspark微批处理方法写入Redshfit DB.
I'm struggling to find the correct packages dependency and their relative version to write to a Redshfit DB with a Pyspark micro-batch approach.
实现此目标的正确依赖关系是什么?
What are the correct dependencies to achieve this goal?
推荐答案
As suggested from AWS tutorial is necessary to provide a JDBC driver
wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar
下载此jar并将其提供给spark-submit
命令后,这就是我向其提供依赖项的方式:
After this jar has been downloaded and make it available to the spark-submit
command, this is how I provided dependencies to it:
spark-submit --master yarn --deploy-mode cluster \
--jars RedshiftJDBC4-no-awssdk-1.2.20.1043.jar \
--packages com.databricks:spark-redshift_2.10:2.0.0,org.apache.spark:spark-avro_2.11:2.4.0,com.eclipsesource.minimal-json:minimal-json:0.9.4 \
my_script.py
最后这是我提供给spark-submit
from pyspark.sql import SparkSession
def foreach_batch_function(df, epoch_id):
df.write\
.format("com.databricks.spark.redshift") \
.option("aws_iam_role", my_role) \
.option("url", my_redshift_url) \
.option("user", my_redshift_user) \
.option("password", my_redshift_password) \
.option("dbtable", my_redshift_schema + "." + my_redshift_table)\
.option("tempdir", "s3://my/temp/dir") \
.mode("append")\
.save()
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", my_aws_access_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", my_aws_secret_access_key)
my_schema = spark.read.parquet(my_schema_file_path).schema
df = spark \
.readStream \
.schema(my_schema) \
.option("maxFilesPerTrigger", 100) \
.parquet(my_source_path)
df.writeStream \
.trigger(processingTime='30 seconds') \
.foreachBatch(foreach_batch_function)\
.option("checkpointLocation", my_checkpoint_location) \
.start(outputMode="update").awaitTermination()
这篇关于Spark 2.4.0依赖关系以写入AWS Redshift的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文