如何从TFX BulkInferrer获取数据帧或数据库写操作? [英] How do I get a dataframe or database write from TFX BulkInferrer?

查看:65
本文介绍了如何从TFX BulkInferrer获取数据帧或数据库写操作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是TFX的新手,但是有一个貌似可行的ML管道,可以通过

I'm very new to TFX, but have an apparently-working ML Pipeline which is to be used via BulkInferrer. That seems to produce output exclusively in Protobuf format, but since I'm running bulk inference I want to pipe the results to a database instead. (DB output seems like it should be the default for bulk inference, since both Bulk Inference & DB access take advantage of parallelization... but Protobuf is a per-record, serialized format.)

我认为我可以使用类似 Parquet-Avro-Protobuf 进行转换(尽管这是在Java中,而其余的管道是在Python中),或者我可以自己编写一些东西来消耗所有protobuf消息,一个,将它们转换为JSON,将JSON反序列化为字典列表,然后将字典加载到Pandas DataFrame中,或者将其存储为一堆键值对,我将其视为一次性数据库...听起来很费劲,而且涉及非常常见的用例的并行化和优化.Protobuf顶级消息定义是Tensorflow的 PredictionLog

I assume I could use something like Parquet-Avro-Protobuf to do the conversion (though that's in Java and the rest of the pipeline's in Python), or I could write something myself to consume all the protobuf messages one-by-one, convert them into JSON, deserialize the JSON into a list of dicts, and load the dict into a Pandas DataFrame, or store it as a bunch of key-value pairs which I treat like a single-use DB... but that sounds like a lot of work and pain involving parallelization and optimization for a very common use case. The top-level Protobuf message definition is Tensorflow's PredictionLog.

必须是一个常见的用例,因为TensorFlowModelAnalytics功能类似于

This must be a common use case, because TensorFlowModelAnalytics functions like this one consume Pandas DataFrames. I'd rather be able to write directly to a DB (preferably Google BigQuery), or a Parquet file (since Parquet / Spark seems to parallelize better than Pandas), and again, those seem like they should be common use cases, but I haven't found any examples. Maybe I'm using the wrong search terms?

我还查看了 PredictExtractor ,因为提取预测"听起来很接近我想要的...但是官方文档对应该如何使用该类却保持沉默.我认为 TFTransformOutput 听起来像是一个很有希望的动词,但它是一个名词.

I also looked at the PredictExtractor, since "extracting predictions" sounds close to what I want... but the official documentation appears silent on how that class is supposed to be used. I thought TFTransformOutput sounded like a promising verb, but instead it's a noun.

我在这里显然缺少基本的东西.没有人想要将BulkInferrer结果存储在数据库中的原因吗?是否有允许我将结果写入数据库的配置选项?也许我想添加 ParquetIO 或将 BigQueryIO 实例添加到TFX管道?(TFX文档说,它在后台使用了Beam" ",但这并没有.没说太多我应该如何一起使用它们的.)但是这些文档中的语法看起来与我的TFX代码完全不同,我不确定它们是否兼容?

I'm clearly missing something fundamental here. Is there a reason no one wants to store BulkInferrer results in a database? Is there a configuration option that allows me to write the results to a DB? Maybe I want to add a ParquetIO or BigQueryIO instance to the TFX pipeline? (TFX docs say it uses Beam "under the hood" but that doesn't say much about how I should use them together.) But the syntax in those documents looks sufficiently different from my TFX code that I'm not sure if they're compatible?

帮助?

推荐答案

(从相关问题中复制以提高可见性)

(Copied from the related issue for greater visibility)

进行一些挖掘之后,这是另一种方法,该方法假定事先不了解 feature_spec .请执行以下操作:

After some digging, here is an alternative approach, which assumes no knowledge of the feature_spec before-hand. Do the following:

  • 通过添加
  • Set the BulkInferrer to write to output_examples rather than inference_result by adding a output_example_spec to the component construction.
  • Add a StatisticsGen and a SchemaGen component in the main pipeline right after the BulkInferrer to generate a schema for the aforementioned output_examples
  • Use the artifacts from SchemaGen and BulkInferrer to read the TFRecords and do whatever is neccessary.
bulk_inferrer = BulkInferrer(
     ....
     output_example_spec=bulk_inferrer_pb2.OutputExampleSpec(
         output_columns_spec=[bulk_inferrer_pb2.OutputColumnsSpec(
             predict_output=bulk_inferrer_pb2.PredictOutput(
                 output_columns=[bulk_inferrer_pb2.PredictOutputCol(
                     output_key='original_label_name',
                     output_column='output_label_column_name', )]))]
     ))

 statistics = StatisticsGen(
     examples=bulk_inferrer.outputs.output_examples
 )

 schema = SchemaGen(
     statistics=statistics.outputs.output,
 )

之后,可以执行以下操作:

After that, one can do the following:

import tensorflow as tf
from tfx.utils import io_utils
from tensorflow_transform.tf_metadata import schema_utils

# read schema from SchemaGen
schema_path = '/path/to/schemagen/schema.pbtxt'
schema_proto = io_utils.SchemaReader().read(schema_path)
spec = schema_utils.schema_as_feature_spec(schema_proto).feature_spec

# read inferred results
data_files = ['/path/to/bulkinferrer/output_examples/examples/examples-00000-of-00001.gz']
dataset = tf.data.TFRecordDataset(data_files, compression_type='GZIP')

# parse dataset with spec
def parse(raw_record):
    return tf.io.parse_example(raw_record, spec)

dataset = dataset.map(parse)

在这一点上,该数据集与任何其他已解析的数据集一样,因此,编写CSV或向BigQuery表或其中的任何内容都很简单.它肯定会帮助我们 ZenML .com/maiot-io/zenml/blob/newpipelines/zenml/core/pipelines/infer_pipeline.py#L150"rel =" nofollow noreferrer> BatchInferencePipeline .

At this point, the dataset is like any other parsed dataset, so its trivial to write a CSV, or to a BigQuery table or whatever from there. It certainly helped us in ZenML with our BatchInferencePipeline.

这篇关于如何从TFX BulkInferrer获取数据帧或数据库写操作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆