Java库为Postgres编写二进制格式COPY? [英] Java library to write binary format for Postgres COPY?
问题描述
有没有人遇到一个Java库(或只是一些代码)来编写Postgres使用的二进制
格式 COPY命令?
Has anyone come across a Java library (or just some code) to write the binary
format used by Postgres' COPY command?
看起来很简单,但如果有人已经找到了正确的元组数据格式,我也可以从那里开始。
It looks very simple, but if someone's already figured out the correct tuple data format, I'd just as well start there.
实际上,即使只是对所有数据类型的格式的描述也会很有帮助。
Actually, even just the description of the formats for all data types would be helpful.
谢谢。
推荐答案
你可以试试 PgBulkInsert ,它实现了PostgreSQL的二进制复制协议:
You could try PgBulkInsert, which implements the Binary Copy Protocol of PostgreSQL:
- https://github.com/bytefish/PgBulkInsert
它也可以从Maven Central Repository获得。
It is also available from the Maven Central Repository.
免责声明:我是项目作者。
Disclaimer: I am the project author.
我不想简单地宣传我的项目,但是还写下了协议。
I don't want to simply advertise my project, but also write about the protocol.
首先,我写了一个类 PgBinaryWriter
,它包装了一个 DataOutputStream
并且具有写入二进制协议头的方法,一种启动新行的方法(二进制复制协议要求您为要插入的每一行写入列数)和编写
方法,该方法采用 IValueHandler< TTargetType>
来编写给定的Java类型。
First of all I have written a class PgBinaryWriter
, which wraps a DataOutputStream
and has methods for writing the Binary Protocol Header, a method to start a new row (the Binary Copy Protocol requires you to write the number of columns for each row you are going to insert) and a write
method, which takes an IValueHandler<TTargetType>
for writing a given Java type.
PgBinaryWriter
实现 AutoClosable
,因为必须写一个 - 在刷新和关闭流之前,1
到流中。
The PgBinaryWriter
implements an AutoClosable
, because it is necessary to write a -1
to the stream before flushing and closing the stream.
IValueHandler< TTargetType>
获取 DataOutputStream
和一个值。它负责使用PostgreSQL二进制协议格式写入给定值。
The IValueHandler<TTargetType>
takes a DataOutputStream
and a value. It is responsible for writing the given value with the PostgreSQL Binary Protocol Format.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers.IValueHandler;
import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.OutputStream;
public class PgBinaryWriter implements AutoCloseable {
/** The ByteBuffer to write the output. */
private transient DataOutputStream buffer;
public PgBinaryWriter() {
}
public void open(final OutputStream out) {
buffer = new DataOutputStream(new BufferedOutputStream(out));
writeHeader();
}
private void writeHeader() {
try {
// 11 bytes required header
buffer.writeBytes("PGCOPY\n\377\r\n\0");
// 32 bit integer indicating no OID
buffer.writeInt(0);
// 32 bit header extension area length
buffer.writeInt(0);
} catch(Exception e) {
throw new BinaryWriteFailedException(e);
}
}
public void startRow(int numColumns) {
try {
buffer.writeShort(numColumns);
} catch(Exception e) {
throw new BinaryWriteFailedException(e);
}
}
public <TTargetType> void write(final IValueHandler<TTargetType> handler, final TTargetType value) {
handler.handle(buffer, value);
}
@Override
public void close() {
try {
buffer.writeShort(-1);
buffer.flush();
buffer.close();
} catch(Exception e) {
throw new BinaryWriteFailedException(e);
}
}
}
ValueHandler
一个 IValueHandler
是一个简单的界面,它有一个句柄
方法 DataOutputStream
和一个值。
ValueHandler
An IValueHandler
is a simple interface, which has a handle
method to take the DataOutputStream
and a value.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;
import java.io.DataOutputStream;
import java.lang.reflect.Type;
public interface IValueHandler<TTargetType> extends ValueHandler {
void handle(DataOutputStream buffer, final TTargetType value);
Type getTargetType();
}
了解协议非常重要,你需要当值为null时,写一个 -1
。为此我编写了一个抽象基类来处理这个案例。
It's important to know about the protocol, that you have to write a -1
when a value is null. For this I have written an abstract base class, that handles the case.
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;
import de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.exceptions.BinaryWriteFailedException;
import java.io.DataOutputStream;
public abstract class BaseValueHandler<T> implements IValueHandler<T> {
@Override
public void handle(DataOutputStream buffer, final T value) {
try {
if (value == null) {
buffer.writeInt(-1);
return;
}
internalHandle(buffer, value);
} catch (Exception e) {
throw new BinaryWriteFailedException(e);
}
}
protected abstract void internalHandle(DataOutputStream buffer, final T value) throws Exception;
}
然后可以实现各种Java类型的处理程序。以下是 long
的示例。您可以在GitHub存储库中找到
其他实现(处理程序)。
Then the handlers for the various Java types can implemented. Here is the example for long
. You can find the
other implementations in the GitHub repository (handlers).
// Copyright (c) Philipp Wagner. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package de.bytefish.pgbulkinsert.de.bytefish.pgbulkinsert.pgsql.handlers;
import java.io.DataOutputStream;
import java.lang.reflect.Type;
public class LongValueHandler extends BaseValueHandler<Long> {
@Override
protected void internalHandle(DataOutputStream buffer, final Long value) throws Exception {
buffer.writeInt(8);
buffer.writeLong(value);
}
@Override
public Type getTargetType() {
return Long.class;
}
}
使用PgBinaryWriter
现在终于连接零件了。请注意,我已经抽象了更多的部分。可能有必要在代码中查找更多实现细节。
Using the PgBinaryWriter
Now it finally comes to connecting the parts. Please note, that I have abstracted some more parts. It might be necessary to lookup more implementation details in the code.
public abstract class PgBulkInsert<TEntity> {
// ...
public void saveAll(PGConnection connection, Stream<TEntity> entities) throws SQLException {
CopyManager cpManager = connection.getCopyAPI();
CopyIn copyIn = cpManager.copyIn(getCopyCommand());
int columnCount = columns.size();
try (PgBinaryWriter bw = new PgBinaryWriter()) {
// Wrap the CopyOutputStream in our own Writer:
bw.open(new PGCopyOutputStream(copyIn));
// Insert all entities:
entities.forEach(entity -> {
// Start a New Row:
bw.startRow(columnCount);
// Insert the Column Data:
columns.forEach(column -> {
try {
column.getWrite().invoke(bw, entity);
} catch (Exception e) {
throw new SaveEntityFailedException(e);
}
});
});
}
}
private String getCopyCommand()
{
String commaSeparatedColumns = columns.stream()
.map(x -> x.columnName)
.collect(Collectors.joining(", "));
return String.format("COPY %1$s(%2$s) FROM STDIN BINARY",
table.GetFullQualifiedTableName(),
commaSeparatedColumns);
}
}
PgBulkInsert
PgBulkInsert支持以下PostgreSQL数据类型。
PgBulkInsert
PgBulkInsert supports the following PostgreSQL data types.
- 数字类型
- smallint
- 整数
- bigint
- 真实
- 双精度
- Numeric Types
- smallint
- integer
- bigint
- real
- double precision
- 时间戳
- 日期
- text
- 布尔
- bytea
- inet(IPv4,IPv6)
- uuid
想象一下,应该将大量人员批量插入PostgreSQL数据库。每个
Person
都有一个名字,一个姓氏和一个生日。Imagine a large amount of persons should be bulk inserted into a PostgreSQL database. Each
Person
has a first name, a last name and a birthdate.PostgreSQL数据库中的表可能如下所示:
The table in the PostgreSQL database might look like this:
CREATE TABLE sample.unit_test ( first_name text, last_name text, birth_date date );
域模型
域模型应用程序可能如下所示:
Domain Model
The domain model in the application might look like this:
private class Person { private String firstName; private String lastName; private LocalDate birthDate; public Person() {} public String getFirstName() { return firstName; } public void setFirstName(String firstName) { this.firstName = firstName; } public String getLastName() { return lastName; } public void setLastName(String lastName) { this.lastName = lastName; } public LocalDate getBirthDate() { return birthDate; } public void setBirthDate(LocalDate birthDate) { this.birthDate = birthDate; } }
批量插入器
然后你必须实现
PgBulkInsert< Person>
,它定义了表和域模型之间的映射。Bulk Inserter
Then you have to implement the
PgBulkInsert<Person>
, which defines the mapping between the table and the domain model.public class PersonBulkInserter extends PgBulkInsert<Person> { public PersonBulkInserter() { super("sample", "unit_test"); MapString("first_name", Person::getFirstName); MapString("last_name", Person::getLastName); MapDate("birth_date", Person::getBirthDate); } }
使用批量插入器
最后我们可以写一个单元测试,将
100000
人插入数据库。你可以在GitHub上找到整个单元测试: IntegrationTest.java 。Using the Bulk Inserter
And finally we can write a Unit Test to insert
100000
Persons into the database. You can find the entire Unit Test on GitHub: IntegrationTest.java.@Test public void bulkInsertPersonDataTest() throws SQLException { // Create a large list of Persons: List<Person> persons = getPersonList(100000); // Create the BulkInserter: PersonBulkInserter personBulkInserter = new PersonBulkInserter(); // Now save all entities of a given stream: personBulkInserter.saveAll(PostgreSqlUtils.getPGConnection(connection), persons.stream()); // And assert all have been written to the database: Assert.assertEquals(100000, getRowCount()); } private List<Person> getPersonList(int numPersons) { List<Person> persons = new ArrayList<>(); for (int pos = 0; pos < numPersons; pos++) { Person p = new Person(); p.setFirstName("Philipp"); p.setLastName("Wagner"); p.setBirthDate(LocalDate.of(1986, 5, 12)); persons.add(p); } return persons; }
这篇关于Java库为Postgres编写二进制格式COPY?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!