Google Dataflow:在流式管道中的 BigQuery 中插入 + 更新 [英] Google Dataflow: insert + update in BigQuery in a streaming pipeline

查看:28
本文介绍了Google Dataflow:在流式管道中的 BigQuery 中插入 + 更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

主要对象

我从 pub/sub 读取输入的 python 流管道.

分析输入后,有两个选项可用:

  • 如果 x=1 -> 插入
  • 如果 x=2 -> 更新
<小时>

测试

  • 使用apache beam函数无法做到这一点,因此需要使用BigQuery的0.25 API进行开发(目前这是Google Dataflow支持的版本)
<小时>

问题

  • 插入的记录仍在 BigQuery 缓冲区中,因此更新语句失败:

     表表上的 UPDATE 或 DELETE 语句会影响流缓冲区中的行,这是不支持的

<小时>

代码

<块引用>

插入

def insertCanonicalBQ(input):从 google.cloud 导入 bigquery客户端 = bigquery.Client(project='project')数据集 = client.dataset('数据集')table = dataset.table('table')table.reload()table.insert_data(行=[[值]])

<块引用>

更新

def UpdateBQ(输入):从 google.cloud 导入 bigquery导入 uuid导入时间客户端 = bigquery.Client()STD="#standardSQL"QUERY= STD + "\n" + """UPDATE table SET field1 = 'XXX' WHERE field2='YYY'"""client.use_legacy_sql = Falsequery_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4())) # API 请求query_job.begin()为真:query_job.reload() # 通过 GET 请求刷新状态.如果 query_job.state == 'DONE':如果 query_job.error_result:引发运行时错误(query_job.errors)打印完成"返回输入时间.睡眠(1)

解决方案

即使行不在流式缓冲区中,这仍然不是解决 BigQuery 中此问题的方法.BigQuery 存储更适合批量更改,而不是像这样通过 UPDATE 更改单个实体.您的模式与我对事务性而非分析性用例的期望相符.

为此考虑基于附加的模式.每次处理实体消息时,都会通过流式插入将其写入 BigQuery.然后,在需要时,您可以通过查询获取所有实体的最新版本.

举个例子,让我们假设一个任意模式:idfield 是你唯一的实体键/标识符,message_time 代表消息发出的时间.您的实体可能有许多其他字段.要获取最新版本的实体,我们可以运行以下查询(并可能将其写入另一个表):

#standardSQL选择idfield,ARRAY_AGG(t ORDER BY message_time DESC LIMIT 1)[OFFSET(0)].* 除了(idfield)从`myproject.mydata.mytable` AS tGROUP BY idfield

这种方法的另一个优点是它还允许您在任意时间点执行分析.要对实体在一小时前的状态进行分析,只需添加一个 WHERE 子句:WHERE message_time <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)

The main object

A python streaming pipeline in which I read the input from pub/sub.

After the input is analyzed, two option are available:

  • If x=1 -> insert
  • If x=2 -> update

Testing

  • This can not be done using apache beam function, so you need to develop it using the 0.25 API of BigQuery (currently this is the version supported in Google Dataflow)

The problem

  • The inserted record are still in the BigQuery buffer, so the update statement fail:

         UPDATE or DELETE statement over table table would affect rows in the streaming buffer, which is not supported
    


The code

Insert

def insertCanonicalBQ(input):
    from google.cloud import bigquery
    client = bigquery.Client(project='project')
    dataset = client.dataset('dataset')
    table = dataset.table('table' )
    table.reload()
    table.insert_data(
        rows=[[values]])

Update

def UpdateBQ(input):
    from google.cloud import bigquery
    import uuid
    import time
    client = bigquery.Client()
    STD= "#standardSQL"
    QUERY= STD + "\n" + """UPDATE table SET field1 = 'XXX' WHERE field2=  'YYY'"""
    client.use_legacy_sql = False    
    query_job = client.run_async_query(query=QUERY, job_name='temp-query-job_{}'.format(uuid.uuid4()))  # API request
    query_job.begin()
    while True:
         query_job.reload()  # Refreshes the state via a GET request.
         if query_job.state == 'DONE':
             if query_job.error_result:
                 raise RuntimeError(query_job.errors)
             print "done"
             return input
             time.sleep(1)

解决方案

Even if the row wasn't in the streaming buffer, this still wouldn't be the way to approach this problem in BigQuery. BigQuery storage is better suited for bulk mutations rather than mutating individual entities like this via UPDATE. Your pattern is aligned with something I'd expect from an transactional rather than analytical use case.

Consider an append-based pattern for this. Each time you process an entity message write it to BigQuery via streaming insert. Then, when needed you can get the latest version of all entities via a query.

As an example, let's assume an arbitrary schema: idfield is your unique entity key/identifier, and message_time represents the time the message was emitted. Your entities may have many other fields. To get the latest version of the entities, we could run the following query (and possibly write this to another table):

#standardSQL
SELECT
  idfield,
  ARRAY_AGG(
    t ORDER BY message_time DESC LIMIT 1
  )[OFFSET(0)].* EXCEPT (idfield)
FROM `myproject.mydata.mytable` AS t
GROUP BY idfield

An additional advantage of this approach is that it also allows you to perform analysis at arbitrary points of time. To perform an analysis of the entities as of their state an hour ago would simply involve adding a WHERE clause: WHERE message_time <= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)

这篇关于Google Dataflow:在流式管道中的 BigQuery 中插入 + 更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆