适用于Redshift的AWS Glue:是否可以替换,更新或删除数据? [英] AWS Glue to Redshift: Is it possible to replace, update or delete data?

查看:468
本文介绍了适用于Redshift的AWS Glue:是否可以替换,更新或删除数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于我的设置方式,这里有一些要点:

  • 我已将CSV文件上传到S3,并设置了Glue搜寻器以创建表和架构.
  • 我有一个Glue作业设置,它使用JDBC连接将数据从Glue表写入到我们的Amazon Redshift数据库中. Job还负责映射列并创建redshift表.

通过重新运行作业,我在redshift中得到了重复的行(如预期的那样).但是,在插入新数据之前,是否有办法使用胶水中的键或分区设置来替换或删除行?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")

## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

job.commit()

解决方案

这是我从AWS Glue支持部门获得的解决方案:

您可能知道,尽管可以创建主键,但是Redshift不会强制唯一性.因此,如果您正在重新运行Glue作业,则可以插入重复的行.保持唯一性的一些方法是:

  1. 使用临时表插入所有行,然后在主表中执行upsert/merge [1],这必须在胶水之外完成.

  2. 在redshift表[1]中添加另一列,例如插入时间戳,以允许重复,但要知道哪一个先出现或最后出现,然后在需要时删除该重复.

  3. 将先前插入的数据加载到数据框中,然后比较要插入的数据,以避免插入重复项[3]

[1]- http://docs .aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.html https://github.com/databricks/spark-redshift /issues/238

[3]- https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

Here are some bullet points in terms of how I have things setup:

  • I have CSV files uploaded to S3 and a Glue crawler setup to create the table and schema.
  • I have a Glue job setup that writes the data from the Glue table to our Amazon Redshift database using a JDBC connection. The Job also is in charge of mapping the columns and creating the redshift table.

By re-running a job, I am getting duplicate rows in redshift (as expected). However, is there way to replace or delete rows before inserting the new data, using a key or the partitions setup in glue?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")

## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

job.commit()

解决方案

This was the solution I got from AWS Glue Support:

As you may know, although you can create primary keys, Redshift doesn't enforce uniqueness. Therefore, if you are rerunning Glue jobs then duplicate rows can get inserted. Some of the ways to maintain uniqueness are:

  1. Use a staging table to insert all rows and then perform a upsert/merge [1] into the main table, this has to be done outside of glue.

  2. Add another column in your redshift table [1], like an insert timestamp, to allow duplicate but to know which one came first or last and then delete the duplicate afterwards if you need to.

  3. Load the previously inserted data into dataframe and then compare the data to be insert to avoid inserting duplicates[3]

[1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.html and http://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows/

[2] - https://github.com/databricks/spark-redshift/issues/238

[3] - https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

这篇关于适用于Redshift的AWS Glue:是否可以替换,更新或删除数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆