使用 apache 梁在镶木地板中编写 protobuf 对象 [英] Writing protobuf object in parquet using apache beam
问题描述
我从 google pub/sub 获取 protobuf 数据并将数据反序列化为 Message 类型对象.所以我得到 PCollection
类型的对象.这是示例代码:
I fetch protobuf data from google pub/sub and deserialize the data to Message type object. So i get PCollection<Message>
type object. Here is sample code:
public class ProcessPubsubMessage extends DoFn<PubsubMessage, Message> {
@ProcessElement
public void processElement(@Element PubsubMessage element, OutputReceiver<Message> receiver) {
byte[] payload = element.getPayload();
try {
Message message = Message.parseFrom(payload);
receiver.output(message);
} catch (InvalidProtocolBufferException e) {
LOG.error("Got exception while parsing message from pubsub. Exception =>" + e.getMessage());
}
}
}
PCollection<Message> event = psMessage.apply("Parsing data from pubsub message",
ParDo.of(new ProcessPubsubMessage()));
我想对 PCollection
以镶木地板格式编写.我知道 apache beam 提供了 ParquetIO 但它适用于 PCollection
类型和从 Message
到 GenericRecord
的转换> 可能会解决问题(但不知道该怎么做).有什么简单的方法可以写镶木地板格式吗?
I want to apply transformation on PCollection<Message> event
to write in parquet format. I know apache beam has provided ParquetIO but it works fine for PCollection<GenericRecord>
type and conversion from Message
to GenericRecord
may solve the problem (Yet don't know how to do that). There is any easy way to write in parquet format ?
推荐答案
可以使用以下库解决:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-protobuf</artifactId>
<version>1.7.7</version>
</dependency>
private GenericRecord getGenericRecord(Event event) throws IOException {
ProtobufDatumWriter<Event> datumWriter = new ProtobufDatumWriter<Event>(Event.class);
ByteArrayOutputStream os = new ByteArrayOutputStream();
Encoder e = EncoderFactory.get().binaryEncoder(os, null);
datumWriter.write(event, e);
e.flush();
ProtobufDatumReader<Event> datumReader = new ProtobufDatumReader<Event>(Event.class);
GenericDatumReader<GenericRecord> genericDatumReader = new GenericDatumReader<GenericRecord>(datumReader.getSchema());
GenericRecord record = genericDatumReader.read(null, DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(os.toByteArray()), null));
return record;
}
详情:https://gist.github.com/alexvictoor/1d3937f502c60318071f
这篇关于使用 apache 梁在镶木地板中编写 protobuf 对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!