写入Avro文件时的架构更新 [英] Schema update while writing to Avro files

查看:124
本文介绍了写入Avro文件时的架构更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

上下文: 我们有一个数据流作业,可将PubSub消息转换为Avro GenericRecords,并将其以".avro"的形式写入GCS. PubSub消息和GenericRecords之间的转换需要一个模式.该架构每周更改一次,仅添加字段.我们希望能够在不更新数据流作业的情况下更新字段.

Context: We have a Dataflow job that transforms PubSub messages into Avro GenericRecords and writes them into GCS as ".avro". The transformation between PubSub messages and GenericRecords requires a schema. This schema changes weekly with field additions only. We want to be able to update the fields without updating the Dataflow job.

我们所做的事情: 我们从这篇文章,并创建了一个Guava缓存,每分钟刷新一次内容.刷新功能将从GCS中提取架构.然后,我们有FileIO.write查询Guava缓存以获取最新的架构,并使用该架构将元素转换为GenericRecord.我们还有FileIO.write输出到Avro接收器,该接收器也使用该架构创建.

What we did: We took the advice from this post and created a Guava Cache that refreshes the content every minute. The refresh function will pull schema from GCS. We then have FileIO.write query the Guava Cache to get the latest schema and transforms the elements with the schema as GenericRecord. We also have FileIO.write outputs to an Avro sink which is also created using the schema.

代码如下:

genericRecordsAsByteArrays.apply(FileIO.<byte[]>write()
    .via(fn((input, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Descriptors.Descriptor paymentRecordFd =
              (Descriptors.Descriptor) schemaInfo.get(DESCRIPTOR_KEY);
          DynamicMessage paymentRecordMsg = DynamicMessage.parseFrom(paymentRecordFd, input);
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);

          //From concrete PaymentRecord bytes to DynamicMessage
          try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(output, null);
            ProtobufDatumWriter<DynamicMessage> pbWriter = new ProtobufDatumWriter<>(schema);
            pbWriter.write(paymentRecordMsg, encoder);
            encoder.flush();

            // From dynamic message to GenericRecord
            byte[] avroContents = output.toByteArray();
            DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
            BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(avroContents, null);
            return reader.read(null, decoder);
          }
        }, requiresSideInputs()),
        fn((output, c) -> {
          Map<String, Object> schemaInfo = cache.get("");
          Schema schema = (Schema) schemaInfo.get(SCHEMA_KEY);
          return AvroIO.sink(schema).withCodec(CodecFactory.snappyCodec());
        }, requiresSideInputs()))
    .withNumShards(5)
    .withNaming(new PerWindowFilenames(baseDir, ".avro"))
    .to(baseDir.toString()));

我的问题:

  1. 当我们写入一个Avro文件时会发生什么,但是突然之间发生架构更新,而现在我们正在将新架构写入使用旧架构创建的Avro文件中?
  2. Dataflow看到新的架构时是否会启动一个新文件?
  3. 在创建新文件之前,Dataflow是否会忽略新架构和其他字段?

每个Avro文件在文件的开头都有其自己的架构,因此我不确定预期的行为是什么.

Each Avro file has its own schema at the very beginning of the file, so I am not sure what's the expected behavior.

推荐答案

现在我们将新架构写入使用旧架构创建的Avro文件

now we are writing the new schema into an Avro file created with the old schema

不可能.每个Avro文件只有一个架构.如果更改,根据定义,您将写入一个新文件.

It's not possible. Each Avro file only has one schema. If it changes, by definition, you'd be writing to a new file.

我怀疑Dataflow会忽略字段.

I doubt Dataflow ignores fields.

这篇关于写入Avro文件时的架构更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆