在从Dataflow插入BigQuery之前验证行 [英] Validating rows before inserting into BigQuery from Dataflow

查看:98
本文介绍了在从Dataflow插入BigQuery之前验证行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据
在从数据流加载BigQuery表时,我们该如何设置maximum_bad_records?在将数据加载到BigQuery时,目前没有办法设置 maxBadRecords 配置来自Dataflow。如果我有 TableSchema ,那么可以在Dataflow作业中验证这些行,然后再将它们插入到BigQuery中。

TableRow ,我该如何确保行可以安全地插入表中?



这样做比在模式中遍历字段,查看它们的类型以及查看行中的值的类还要容易。这看起来很容易出错,并且该方法必须是傻瓜式的,因为如果单行无法加载,则整个管道将失败。



更新:

我的用例是一个ETL作业,它首先在JSON上运行(每行一个对象)登录到Cloud Storage并批量写入BigQuery,但稍后会从PubSub读取对象并连续写入BigQuery。这些对象包含很多不必在BigQuery中具有的信息,并且还包含了无法在模式中进行描述的部分(基本上是自由格式的JSON有效内容)。诸如时间戳之类的东西也需要格式化以适应BigQuery。这项工作将会有几个变种运行在不同的输入上并写入不同的表格。

理论上这不是一个非常困难的过程,它需要一个对象,提取一个几个属性(50-100),格式化其中的一些并将对象输出到BigQuery。我或多或少只是循环访问属性名称列表,从源对象中提取值,查看配置以查看是否应以某种方式格式化该属性,如果需要应用格式(这可能是downcasing,将毫秒时间戳1000,从URL中提取主机名等),然后将值写入到 TableRow 对象中。



我的问题是数据很混乱。有几亿个物体有一些看起来不像预期的那样,这种情况很少见,但这些情况仍然很少发生。有时候应该包含一个字符串的属性包含一个整数,反之亦然。有时候应该有一个数组或一个对象,应该有一个字符串。



理想情况下,我想将 TableRow 并通过 TableSchema 传递它并询问这是否工作?。

因为这是不可能的而是看看 TableSchema 对象,并尝试自己验证/转换这些值。如果 TableSchema 表示一个属性类型为 STRING ,我运行 value.toString() / code>,然后将其添加到 TableRow 中。如果它是 INTEGER ,我检查它是整数 BigInteger ,依此类推。这个方法的问题是我只是猜测BigQuery中的工作。对于 FLOAT ,它将接受哪些Java数据类型?对于 TIMESTAMP ?我认为我的验证/演员可以解决大多数问题,但总会有例外情况和边缘情况。



根据我的经验,这是非常有限的,整个工作流程工作流程?不确定正确的术语)如果单行失败,则会失败BigQuery的验证(就像常规加载一样,除非 maxBadRecords 设置为足够大的数字)。它也会失败,并且带有表面上有用的消息,如'BigQuery导入作业dataflow_job_xxx失败。原因:(5db0b2cdab1557e0):项目xxx中的BigQuery作业dataflow_job_xxx结束时出现错误:errorResult:为非记录字段指定的JSON映射,错误:为非记录字段指定的JSON映射,错误:指定了JSON映射对于非记录字段,错误:为非记录字段指定了JSON映射,错误:为非记录字段指定了JSON映射,错误:为非记录字段指定了JSON映射'。也许有一个地方可以看到一个更详细的错误信息,可以告诉我它是哪个属性以及它的价值是什么?如果没有这些信息,它可能会说坏数据。

据我所知,至少在批处理模式下运行Dataflow将写入 TableRow 对象添加到云存储中的临时区域,然后在所有内容都存在时开始加载。这意味着无法找到任何错误,当BigQuery加载时,我的代码不再运行。我还没有在流模式下运行任何工作,但我不确定它会有什么不同,从我的(理所当然有限的)理解中,基本原理是相同的,只是批量更小。



人们使用Dataflow和BigQuery,因此无需担心由于一个错误输入而导致整个管道停止,就无法做到这一点。如何做到这一点?

解决方案

我假设您从文件反序列化JSON作为映射< String,Object> 。然后你应该可以用 TableSchema 递归地检查它。



我推荐一种迭代方法通过以下两个步骤来开发模式验证。 写一个 PTransform< Map< String ,Object>,TableRow> 将您的JSON行转换为 TableRow 对象。 TableSchema 也应该是该函数的构造函数参数。您可以开始使这个函数非常严格 - 要求JSON直接解析输入为Integer,例如,当找到BigQuery INTEGER模式时 - 并积极地声明记录有误。基本上,请确保在您的处理中超级严格地输出无效记录。



我们的此处的代码有些类似 - 给定一个由BigQuery生成并写为JSON的文件,我们递归地遍历模式并进行一些类型转换。但是,我们不需要验证,因为BigQuery自己写了数据。



请注意, TableSchema 对象是不是可序列化。我们通过转换 DoFn PTransform 中的 TableSchema $ c>构造函数转换为JSON String 并返回。请参阅使用 jsonTableSchema 变量 BigQueryIO.java 中的代码。

使用本博客文章处理不良记录 - 侧输出违规 Map< String,Object> PTransform中的行并将它们写入文件。这样,您可以检查后来验证失败的行。 您可能从一些小文件开始并使用 DirectPipelineRunner 而不是 DataflowPipelineRunner 。直接运行程序在您的计算机上运行管道,而不是在Google Cloud Dataflow服务上运行,并使用BigQuery流式写入。我相信,当这些写入失败时,您将获得更好的错误消息。



<我们使用GCS-> BigQuery加载作业模式进行批量作业,因为它效率更高,有效,但BigQuery流在Streaming作业中写入,因为它们低延迟。)



最后,在记录信息方面:


  • 确定检查云端日志记录(按照日志面板上的工作日志链接。

  • 如果您运行 bq 命令行实用程序 bq show -j PROJECT:dataflow_job_XXXXXXX


According to How do we set maximum_bad_records when loading a Bigquery table from dataflow? there is currently no way to set the maxBadRecords configuration when loading data into BigQuery from Dataflow. The suggestion is to validate the rows in the Dataflow job before inserting them into BigQuery.

If I have the TableSchema and a TableRow, how do I go about making sure that the row can safely be inserted into the table?

There must be an easier way of doing this than iterating over the fields in the schema, looking at their type and looking at the class of the value in the row, right? That seems error-prone, and the method must be fool-proof since the whole pipeline fails if a single row cannot be loaded.

Update:

My use case is an ETL job that at first will run on JSON (one object per line) logs on Cloud Storage and write to BigQuery in batch, but later will read objects from PubSub and write to BigQuery continuously. The objects contain a lot of information that isn't necessary to have in BigQuery and also contains parts that aren't even possible to describe in a schema (basically free form JSON payloads). Things like timestamps also need to be formatted to work with BigQuery. There will be a few variants of this job running on different inputs and writing to different tables.

In theory it's not a very difficult process, it takes an object, extracts a few properties (50-100), formats some of them and outputs the object to BigQuery. I more or less just loop over a list of property names, extract the value from the source object, look at a config to see if the property should be formatted somehow, apply the formatting if necessary (this could be downcasing, dividing a millisecond timestamp by 1000, extracting the hostname from a URL, etc.), and write the value to a TableRow object.

My problem is that data is messy. With a couple of hundred million objects there are some that don't look as expected, it's rare, but with these volumes rare things still happen. Sometimes a property that should contain a string contains an integer, or vice-versa. Sometimes there's an array or an object where there should be a string.

Ideally I would like to take my TableRow and pass it by TableSchema and ask "does this work?".

Since this isn't possible what I do instead is I look at the TableSchema object and try to validate/cast the values myself. If the TableSchema says a property is of type STRING I run value.toString() before adding it to the TableRow. If it's an INTEGER I check that it's a Integer, Long or BigInteger, and so on. The problem with this method is that I'm just guessing what will work in BigQuery. What Java data types will it accept for FLOAT? For TIMESTAMP? I think my validations/casts catch most problems, but there are always exceptions and edge cases.

In my experience, which is very limited, the whole work pipeline (job? workflow? not sure about the correct term) fails if a single row fails BigQuery's validations (just like a regular load does unless maxBadRecords is set to a sufficiently large number). It also fails with superficially helpful messages like 'BigQuery import job "dataflow_job_xxx" failed. Causes: (5db0b2cdab1557e0): BigQuery job "dataflow_job_xxx" in project "xxx" finished with error(s): errorResult: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field'. Perhaps there is somewhere that can see a more detailed error message that could tell me which property it was and what the value was? Without that information it could just as well have said "bad data".

From what I can tell, at least when running in batch mode Dataflow will write the TableRow objects to the staging area in Cloud Storage and then start a load once everything is there. This means that there is nowhere for me to catch any errors, my code is no longer running when BigQuery is loaded. I haven't run any job in streaming mode yet, but I'm not sure how it would be different there, from my (admittedly limited) understanding the basic principle is the same, it's just the batch size that's smaller.

People use Dataflow and BigQuery, so it can't be impossible to make this work without always having to worry about the whole pipeline stopping because of a single bad input. How do people do it?

解决方案

I'm assuming you deserialize the JSON from the file as a Map<String, Object>. Then you should be able to recursively type-check it with a TableSchema.

I'd recommend an iterative approach to developing your schema validation, with the following two steps.

  1. Write a PTransform<Map<String, Object>, TableRow> that converts your JSON rows to TableRow objects. The TableSchema should also be a constructor argument to the function. You can start off making this function really strict -- require that JSON parsed input as Integer directly, for instance, when a BigQuery INTEGER schema was found -- and aggressively declare records in error. Basically, ensure that no invalid records are output by being super-strict in your handling.

    Our code here does something somewhat similar -- given a file produced by BigQuery and written as JSON to GCS, we recursively walk the schema and do some type conversions. However, we do not need to validate, because BigQuery itself wrote the data.

    Note that the TableSchema object is not Serializable. We've worked around by converting the TableSchema in a DoFn or PTransform constructor to a JSON String and back. See the code in BigQueryIO.java that uses the jsonTableSchema variable.

  2. Use the "dead-letter" strategy described in this blog post to handle bad records -- side output the offending Map<String, Object> rows from your PTransform and write them to a file. That way, you can inspect the rows that failed your validation later.

You might start with some small files and use the DirectPipelineRunner rather than the DataflowPipelineRunner. The direct runner runs the pipeline on your computer, rather than on Google Cloud Dataflow service, and it uses the BigQuery streaming writes. I believe when those writes fail you will get better error messages.

(We use the GCS->BigQuery Load Job pattern for Batch jobs because it's much more efficient and cost-effective, but BigQuery streaming writes in Streaming jobs because they are low-latency.)

Finally, in terms of logging information:

  • Definitely check Cloud Logging (by following the Worker Logs link on the logs panel.
  • You may get better information about why the load jobs triggered by your Batch Dataflows fail if you run the bq command-line utility: bq show -j PROJECT:dataflow_job_XXXXXXX.

这篇关于在从Dataflow插入BigQuery之前验证行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆