Google App Engine:在数据存储上使用Big Query? [英] Google App Engine: Using Big Query on datastore?

查看:147
本文介绍了Google App Engine:在数据存储上使用Big Query?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个GAE数据存储种类,其中有数十万个物体。想做几个涉及的查询(涉及计数查询)。大型查询似乎是适合做这件事的上帝。



目前有一种简单的方法可以使用Big Query查询实时AppEngine Datastore吗?


<您不能直接在DataStore实体上运行BigQuery,但可以编写一个Mapper管道,用于从DataStore中读取实体,并将它们写入CSV云端存储,然后将这些内容吸收到BigQuery中 - 甚至可以自动执行此过程。以下是仅为DataStore使用 Mapper API 类的示例CSV步骤:

 导入重新导入
导入时间
导入日期时间
导入urllib
导入httplib2
从google.appengine.ext导入pickle

从google.appengine.ext导入blobstore
从google.appengine.ext导入db
import webapp

from google.appengine.ext.webapp.util从google.appengine.ext.webapp导入run_wsgi_app
从google.appengine.ext.webapp导入blobstore_handlers
import util
from google.appengine.ext.webapp导入模板
$ b $ from mapreduce.lib导入文件
from google.appengine.api import taskqueue
from google.appengine.api import用户

从mapreduce导入base_handler
从mapreduce导入mapreduce_pipeline
从mapreduce导入操作作为op

from apiclient.discovery import build
from google.appengine.api从oauth2client.appengine导入memcache
导入AppAssertionCredentials


#碎片数量在Mapper管道中使用
SHARDS = 20

#该项目的Google Cloud Storage存储桶的名称
GS_BUCKET ='您的存储桶'

#DataStore Model
class YourEntity(db.Expando):
field1 = db.StringProperty()#etc等

ENTITY_KIND ='main.YourEntity'


class MapReduceStart(webapp.RequestHandler):
提供用户启动MapReduce管道的链接的处理程序。

def get(self):
pipeline = IteratorPipeline(ENTITY_KIND)
pipeline.start()
path = pipeline.base_path +/ status? root =+ pipeline.pipeline_id
logging.info('重定向到:%s'%path)
self.redirect(路径)


class IteratorPipeline base_handler.PipelineBase):
迭代数据存储的管道

def run(self,entity_type):
output = yield mapreduce_pipeline.MapperPipeline(
DataStore_to_Google_Storage_Pipeline,
main.datastore_map,
mapreduce.input_readers.DatastoreInputReader,
output_writer_spec =mapreduce.output_writers.FileOutputWriter,
params = {
input_reader:{
entity_kind:entity_type,
},
output_writer:{
filesystem:gs,
gs_bucket_name:GS_BUCKET,
output_sharding:无,,
}
},
shards = SHARDS)


def datastore_map(entity_type):
道具= GetPropsFor(entity_type)
data = db.to_dict(entity_type)
result =','。join([''%s''%str(data.get(k))for k in props])
yield( '%s\'%result)


def GetPropsFor(entity_or_kind):
if(isinstance(entity_or_kind,basestring)):
kind = entity_or_kind
else:
kind = entity_or_kind.kind()
cls = globals()。get(kind)
return cls.properties()


application = webapp.WSGIApplication(
[('/ start',MapReduceStart)],
debug = True)

def main():
run_wsgi_app应用程序)

if __name__ ==__main__:
main()

如果将此附加到IteratorPipeline类的末尾: yield CloudStorageToBigQuery(output),您可以将生成的csv文件句柄传递到BigQuery摄入管道中,如下所示:

  class CloudStorageToBigQuery(base_handler.PipelineBase): 
启动BigQuery消化作业的管道。

def run(self,output):

#BigQuery API设置
SCOPE ='https://www.googleapis.com/auth/ bigquery'
PROJECT_ID ='Some_ProjectXXXX'
DATASET_ID ='Some_DATASET'

#创建一个新的API服务来与BigQuery交互
credentials = AppAssertionCredentials(scope = SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build(bigquery,v2,http = http)

jobs = bigquery_service.jobs( )
table_name ='datastore_dump_%s'%datetime.utcnow()。strftime(
'%m%d%Y_%H%M%S')
files = [str(f .replace('/ gs /','gs://'))for f in output]
result = jobs.insert(projectId = PROJECT_ID,
body = build_job_data(table_name,files))。执行()
logging.info(结果)

def build_job_data(table_name,files):
return {projectId:PROJECT_ID,
configuration:{
加载:{
sourceUris:文件,
schema:{
#把你的模式放在这里
fields:fields
},
destinationTable:{
projectId :PROJECT_ID,
datasetId:DATASET_ID,
tableId:table_name,
},
}
}
}


Have a GAE datastore kind with several 100'000s of objects in them. Want to do several involved queries (involving counting queries). Big Query seems a god fit for doing this.

Is there currently an easy way to query a live AppEngine Datastore using Big Query?

解决方案

You can't run a BigQuery directly on DataStore entities, but you can write a Mapper Pipeline that reads entities out of DataStore, writes them to CSV in Google Cloud Storage, and then ingests those into BigQuery - you can even automate the process. Here's an example of using the Mapper API classes for just the DataStore to CSV step:

import re
import time
from datetime import datetime
import urllib
import httplib2
import pickle

from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import webapp

from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template

from mapreduce.lib import files
from google.appengine.api import taskqueue
from google.appengine.api import users

from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op

from apiclient.discovery import build
from google.appengine.api import memcache
from oauth2client.appengine import AppAssertionCredentials


#Number of shards to use in the Mapper pipeline
SHARDS = 20

# Name of the project's Google Cloud Storage Bucket
GS_BUCKET = 'your bucket'

# DataStore Model
class YourEntity(db.Expando):
  field1 = db.StringProperty() # etc, etc

ENTITY_KIND = 'main.YourEntity'


class MapReduceStart(webapp.RequestHandler):
  """Handler that provides link for user to start MapReduce pipeline.
  """
  def get(self):
    pipeline = IteratorPipeline(ENTITY_KIND)
    pipeline.start()
    path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
    logging.info('Redirecting to: %s' % path)
    self.redirect(path)


class IteratorPipeline(base_handler.PipelineBase):
  """ A pipeline that iterates through datastore
  """
  def run(self, entity_type):
    output = yield mapreduce_pipeline.MapperPipeline(
      "DataStore_to_Google_Storage_Pipeline",
      "main.datastore_map",
      "mapreduce.input_readers.DatastoreInputReader",
      output_writer_spec="mapreduce.output_writers.FileOutputWriter",
      params={
          "input_reader":{
              "entity_kind": entity_type,
              },
          "output_writer":{
              "filesystem": "gs",
              "gs_bucket_name": GS_BUCKET,
              "output_sharding":"none",
              }
          },
          shards=SHARDS)


def datastore_map(entity_type):
  props = GetPropsFor(entity_type)
  data = db.to_dict(entity_type)
  result = ','.join(['"%s"' % str(data.get(k)) for k in props])
  yield('%s\n' % result)


def GetPropsFor(entity_or_kind):
  if (isinstance(entity_or_kind, basestring)):
    kind = entity_or_kind
  else:
    kind = entity_or_kind.kind()
  cls = globals().get(kind)
  return cls.properties()


application = webapp.WSGIApplication(
                                     [('/start', MapReduceStart)],
                                     debug=True)

def main():
  run_wsgi_app(application)

if __name__ == "__main__":
  main()

If you append this to the end of your IteratorPipeline class: yield CloudStorageToBigQuery(output), you can pipe the resulting csv filehandle into a BigQuery ingestion pipe... like this:

class CloudStorageToBigQuery(base_handler.PipelineBase):
  """A Pipeline that kicks off a BigQuery ingestion job.
  """
  def run(self, output):

# BigQuery API Settings
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'Some_ProjectXXXX'
DATASET_ID = 'Some_DATASET'

# Create a new API service for interacting with BigQuery
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)

jobs = bigquery_service.jobs()
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
    '%m%d%Y_%H%M%S')
files = [str(f.replace('/gs/', 'gs://')) for f in output]
result = jobs.insert(projectId=PROJECT_ID,
                    body=build_job_data(table_name,files)).execute()
logging.info(result)

def build_job_data(table_name, files):
  return {"projectId": PROJECT_ID,
          "configuration":{
              "load": {
                  "sourceUris": files,
                  "schema":{
                      # put your schema here
                      "fields": fields
                      },
                  "destinationTable":{
                      "projectId": PROJECT_ID,
                      "datasetId": DATASET_ID,
                      "tableId": table_name,
                      },
                  }
              }
          }

这篇关于Google App Engine:在数据存储上使用Big Query?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆