AWS Glue迁移到Redshift:重复的数据? [英] AWS Glue to Redshift: duplicate data?

查看:134
本文介绍了AWS Glue迁移到Redshift:重复的数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于我的设置方式,这里有一些要点:

Here are some bullet points in terms of how I have things setup:

我已将CSV文件上传到S3,并设置了Glue搜寻器以创建表和架构. 我有一个Glue作业设置,它使用JDBC连接将数据从Glue表写入到我们的Amazon Redshift数据库中. 作业还负责映射列和创建redshift表. 通过重新运行作业,我在redshift中得到了重复的行(如预期的那样).

I have CSV files uploaded to S3 and a Glue crawler setup to create the table and schema. I have a Glue job setup that writes the data from the Glue table to our Amazon Redshift database using a JDBC connection. The Job also is in charge of mapping the columns and creating the redshift table. By re-running a job, I am getting duplicate rows in redshift (as expected).

但是,在插入新数据之前,有没有办法替换或删除行?

However, is there way to replace or delete rows before inserting the new data?

书签"功能为启用",但不起作用.

BOOKMARK functionality is Enable but not working.

在使用Python将数据推送到redshift之前,如何连接到redshift,删除所有数据作为JOB的一部分?

How can I connect to redshift, delete all data as a part of JOB before pushing data to redshift in Python?

推荐答案

当前Glue不支持JDBC源书签.

Currently Glue doesn't support bookmarking for JDBC sources.

您可以实现 upsert/使用postactions选项(Scala中的代码)合并到Glue作业的Redshift 中:

You can implement upsert/merge into Redshift in Glue job using postactions option (code in Scala):

val fields = sourceDf.columns.mkString(",")

glueContext.getJDBCSink(
  catalogConnection = "RedshiftConnectionTest",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> "staging_schema.staging_table",
    "postactions" -> 
        s"""
           DELETE FROM dst_schema.dst_table USING staging_schema.staging_table AS S WHERE dst_table.id = S.id;
           INSERT INTO dst_schema.dst_table ($fields) SELECT $fields FROM staging_schema.staging_table;
           DROP TABLE IF EXISTS staging_schema.staging_table
        """
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))

如果只想删除现有表,则可以改用preactions参数:

If you just want to delete existing table then you can use preactions parameter instead:

glueContext.getJDBCSink(
  catalogConnection = "RedshiftConnectionTest",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> "dst_schema.dst_table",
    "preactions" -> "DELETE FROM dst_schema.dst_table"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(DynamicFrame(sourceDf, glueContext))

这篇关于AWS Glue迁移到Redshift:重复的数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆