Java库为Postgres编写二进制格式COPY? [英] Java library to write binary format for Postgres COPY?

查看:234
本文介绍了Java库为Postgres编写二进制格式COPY?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有人遇到一个Java库(或只是一些代码)来编写Postgres使用的二进制格式 COPY命令

Has anyone come across a Java library (or just some code) to write the binary format used by Postgres' COPY command?

看起来很简单,但如果有人已经找到了正确的元组数据格式,我也可以从那里开始。

It looks very simple, but if someone's already figured out the correct tuple data format, I'd just as well start there.

实际上,即使只是对所有数据类型的格式的描述也会很有帮助。

Actually, even just the description of the formats for all data types would be helpful.

谢谢。

推荐答案

你可以试试 PgBulkInsert ,它实现了PostgreSQL的二进制复制协议:

You could try PgBulkInsert, which implements the Binary Copy Protocol of PostgreSQL:

  • https://github.com/bytefish/PgBulkInsert

它也可以从Maven Central Repository获得。

It is also available from the Maven Central Repository.

免责声明:我是项目作者。

Disclaimer: I am the project author.

我不想简单地宣传我的项目,但是还写下了协议。

I don't want to simply advertise my project, but also write about the protocol.

首先,我写了一个类 PgBinaryWriter ,它包装了一个 DataOutputStream 并且具有写入二进制协议头的方法,一种启动新行的方法(二进制复制协议要求您为要插入的每一行写入列数)和编写方法,该方法采用 IValueHandler< TTargetType> 来编写给定的Java类型。

First of all I have written a class PgBinaryWriter, which wraps a DataOutputStream and has methods for writing the Binary Protocol Header, a method to start a new row (the Binary Copy Protocol requires you to write the number of columns for each row you are going to insert) and a write method, which takes an IValueHandler<TTargetType> for writing a given Java type.

PgBinaryWriter 实现 AutoClosable ,因为必须写一个 - 在刷新和关闭流之前,1 到流中。

The PgBinaryWriter implements an AutoClosable, because it is necessary to write a -1 to the stream before flushing and closing the stream.

IValueHandler< TTargetType> 获取 DataOutputStream 和一个值。它负责使用PostgreSQL二进制协议格式写入给定值。

The IValueHandler<TTargetType> takes a DataOutputStream and a value. It is responsible for writing the given value with the PostgreSQL Binary Protocol Format.

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql;


import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;

public class PgBinaryWriter implements AutoCloseable {

    /** The ByteBuffer to write the output. */
    private transient DataOutputStream buffer;

    public PgBinaryWriter() {
    }

    public void open(final OutputStream out) {
        buffer = new DataOutputStream(new BufferedOutputStream(out));

        writeHeader();
    }

    private void writeHeader() {
        try {

            // 11 bytes required header
            buffer.writeBytes("PGCOPY\n\377\r\n\0");
            // 32 bit integer indicating no OID
            buffer.writeInt(0);
            // 32 bit header extension area length
            buffer.writeInt(0);

        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    public void startRow(int numColumns) {
        try {
            buffer.writeShort(numColumns);
        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
        handler.handle(buffer, value);
    }

    @Override
    public void close() {
        try {
            buffer.writeShort(-1);

            buffer.flush();
            buffer.close();
        } catch(Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }
}



ValueHandler



一个 IValueHandler 是一个简单的界面,它有一个句柄方法 DataOutputStream 和一个值。

ValueHandler

An IValueHandler is a simple interface, which has a handle method to take the DataOutputStream and a value.

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public interface IValueHandler<TTargetType> extends ValueHandler {

    void handle(DataOutputStream buffer, final TTargetType value);

    Type getTargetType();

}

了解协议非常重要,你需要当值为null时,写一个 -1 。为此我编写了一个抽象基类来处理这个案例。

It's important to know about the protocol, that you have to write a -1 when a value is null. For this I have written an abstract base class, that handles the case.

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;

import java.io.DataOutputStream;

public abstract class BaseValueHandler<T> implements IValueHandler<T> {

    @Override
    public void handle(DataOutputStream buffer, final T value) {
        try {
            if (value == null) {
                buffer.writeInt(-1);
                return;
            }
            internalHandle(buffer, value);
        } catch (Exception e) {
            throw new BinaryWriteFailedException(e);
        }
    }

    protected abstract void internalHandle(DataOutputStream buffer, final T value) throws Exception;
}

然后可以实现各种Java类型的处理程序。以下是 long 的示例。您可以在GitHub存储库中找到
其他实现(处理程序)。

Then the handlers for the various Java types can implemented. Here is the example for long. You can find the other implementations in the GitHub repository (handlers).

// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;

import java.io.DataOutputStream;
import java.lang.reflect.Type;

public class LongValueHandler extends BaseValueHandler<Long> {

    @Override
    protected void internalHandle(DataOutputStream buffer, final Long value) throws Exception {
        buffer.writeInt(8);
        buffer.writeLong(value);
    }

    @Override
    public Type getTargetType() {
        return Long.class;
    }
}



使用PgBinaryWriter



现在终于连接零件了。请注意,我已经抽象了更多的部分。可能有必要在代码中查找更多实现细节。

Using the PgBinaryWriter

Now it finally comes to connecting the parts. Please note, that I have abstracted some more parts. It might be necessary to lookup more implementation details in the code.

public abstract class PgBulkInsert<TEntity> {

    // ... 

    public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {

        CopyManager cpManager = connection.getCopyAPI();
        CopyIn copyIn = cpManager.copyIn(getCopyCommand());

        int columnCount = columns.size();

        try (PgBinaryWriter bw = new PgBinaryWriter()) {

            // Wrap the CopyOutputStream in our own Writer:
            bw.open(new PGCopyOutputStream(copyIn));

            // Insert all entities:                
            entities.forEach(entity -> {

                // Start a New Row:
                bw.startRow(columnCount);

                // Insert the Column Data:
                columns.forEach(column -> {
                    try {
                        column.getWrite().invoke(bw, entity);
                    } catch (Exception e) {
                        throw new SaveEntityFailedException(e);
                    }
                });
            });
        }
    }

    private String getCopyCommand()
    {
        String commaSeparatedColumns = columns.stream()
                .map(x -> x.columnName)
                .collect(Collectors.joining(", "));

        return String.format("COPY %1$s(%2$s) FROM STDIN BINARY",
                table.GetFullQualifiedTableName(),
                commaSeparatedColumns);
    }
}



PgBulkInsert



PgBulkInsert支持以下PostgreSQL数据类型。

PgBulkInsert

PgBulkInsert supports the following PostgreSQL data types.


  • 数字类型


    • smallint

    • 整数

    • bigint

    • 真实

    • 双精度

    • Numeric Types
      • smallint
      • integer
      • bigint
      • real
      • double precision

      • 时间戳

      • 日期


      • text


      • 布尔


      • bytea


      • inet(IPv4,IPv6)


      • uuid

      想象一下,应该将大量人员批量插入PostgreSQL数据库。每个 Person 都有一个名字,一个姓氏和一个生日。

      Imagine a large amount of persons should be bulk inserted into a PostgreSQL database. Each Person has a first name, a last name and a birthdate.

      PostgreSQL数据库中的表可能如下所示:

      The table in the PostgreSQL database might look like this:

      CREATE TABLE sample.unit_test
      (
          first_name text,
          last_name text,
          birth_date date
      );
      



      域模型



      域模型应用程序可能如下所示:

      Domain Model

      The domain model in the application might look like this:

      private class Person {
      
          private String firstName;
      
          private String lastName;
      
          private LocalDate birthDate;
      
          public Person() {}
      
          public String getFirstName() {
              return firstName;
          }
      
          public void setFirstName(String firstName) {
              this.firstName = firstName;
          }
      
          public String getLastName() {
              return lastName;
          }
      
          public void setLastName(String lastName) {
              this.lastName = lastName;
          }
      
          public LocalDate getBirthDate() {
              return birthDate;
          }
      
          public void setBirthDate(LocalDate birthDate) {
              this.birthDate = birthDate;
          }
      
      }
      



      批量插入器



      然后你必须实现 PgBulkInsert< Person> ,它定义了表和域模型之间的映射。

      Bulk Inserter

      Then you have to implement the PgBulkInsert<Person>, which defines the mapping between the table and the domain model.

      public class PersonBulkInserter extends PgBulkInsert<Person>
      {
          public PersonBulkInserter() {
              super("sample", "unit_test");
      
              MapString("first_name", Person::getFirstName);
              MapString("last_name", Person::getLastName);
              MapDate("birth_date", Person::getBirthDate);
          }
      }
      



      使用批量插入器



      最后我们可以写一个单元测试,将 100000 人插入数据库。你可以在GitHub上找到整个单元测试: IntegrationTest.java

      Using the Bulk Inserter

      And finally we can write a Unit Test to insert 100000 Persons into the database. You can find the entire Unit Test on GitHub: IntegrationTest.java.

      @Test
      public void bulkInsertPersonDataTest() throws SQLException {
          // Create a large list of Persons:
          List<Person> persons = getPersonList(100000);
      
          // Create the BulkInserter:
          PersonBulkInserter personBulkInserter = new PersonBulkInserter();
      
          // Now save all entities of a given stream:
          personBulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), persons.stream());
      
          // And assert all have been written to the database:
          Assert.assertEquals(100000, getRowCount());
      }
      
      private List<Person> getPersonList(int numPersons) {
          List<Person> persons = new ArrayList<>();
      
          for (int pos = 0; pos < numPersons; pos++) {
              Person p = new Person();
      
              p.setFirstName("Philipp");
              p.setLastName("Wagner");
              p.setBirthDate(LocalDate.of(1986, 5, 12));
      
              persons.add(p);
          }
      
          return persons;
      }
      

      这篇关于Java库为Postgres编写二进制格式COPY?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆