如何在pyspark的postgres jdbc驱动程序中使用nextval()? [英] how to use nextval() in a postgres jdbc driver for pyspark?

查看:66
本文介绍了如何在pyspark的postgres jdbc驱动程序中使用nextval()?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Postgres中有一个名为"mytable"的表,其中有两列,即id(bigint)和值(varchar(255)).

I have a table named "mytable" in Postgres with two columns, id (bigint) and value (varchar(255)).

id使用nextval('my_sequence')从序列中获取其值.

id gets its value from a sequence using nextval('my_sequence').

PySpark应用程序获取一个数据框,并使用Postgres JDBC jar(postgresql-42.1.4.jar)将数据框插入"mytable"中.我正在使用以下方法创建id列:

A PySpark application takes a dataframe and uses the Postgres JDBC jar (postgresql-42.1.4.jar) to insert the dataframe into "mytable". I'm creating the id column using:

df.withColumn('id', lit("nextval('my_sequence')"))

Postgres将该列解释为可变字符".

Postgres is interpreting the column as a 'varying character'.

我看到读取数据时有一些调用Postgres方法的方法(

I can see that there are ways for calling Postgres methods when reading data (How to remotely execute a Postgres SQL function on Postgres using PySpark JDBC connector?), but I'm not sure how to call a Postgres function like nextval() for writing data to Postgres.

这是我目前将数据从Pyspark写入Postgres的方式:

Here's how I am currently writing the data from Pyspark to Postgres:

df.write.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", 'mytable') \
    .mode('append') \
    .save()

当一列需要使用nextval()的序列号时,如何使用PySpark写入Postgres表?

How can one write to a Postgres table using PySpark when one column needs a sequence number using nextval()?

推荐答案

TL; DR 您不能在插入时执行数据库代码,除非您创建自己的JdbcDialect并覆盖插入逻辑.我认为这不是您想要为如此小的功能所要做的事情.

TL;DR You cannot execute database code on insert unless you create your own JdbcDialect and override insert logic. I reckon it is not something you want to do for such a small feature.

我个人会使用触发器:

CREATE FUNCTION set_id() RETURNS trigger AS $set_id$
  BEGIN
    IF NEW.id IS NULL THEN
      NEW.id = nextval('my_sequence');
    END IF;
    RETURN NEW;
  END;
$set_id$ LANGUAGE plpgsql;

CREATE TRIGGER set_id BEFORE INSERT ON mytable
    FOR EACH ROW EXECUTE PROCEDURE set_id();

并将其余的工作留给数据库服务器.

and leave the rest of the job to the database server.

df.select(lit(null).cast("bigint").alias("id"), col("value")).write
    ...

您还可以使用monotonically_increasing_id(主键与Apache Spark 一起使用),并根据最大ID来移动值在数据库中,但是可能很脆弱.

You could also use monotonically_increasing_id (Primary keys with Apache Spark) and just shift values according to the largest id in the database, but it might be brittle.

这篇关于如何在pyspark的postgres jdbc驱动程序中使用nextval()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆