在App Engine中使用MapReduce创建GROUP BY [英] Make a GROUP BY with MapReduce in App Engine

查看:130
本文介绍了在App Engine中使用MapReduce创建GROUP BY的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种方法,使用MapReduce在数据存储区的查询中进行GROUP BY操作。 AFAIK App Engine在GQL中不支持GROUP BY本身,其他开发人员建议的一种好方法是使用 MapReduce



我下载了源代码,我正在研究演示代码,我试着在我的情况下实现。但我没有成功。这是我如何尝试去做的。也许我所做的一切都是错误的。所以如果有人能帮我做到这一点,我会感谢。






我想要做的是:我有一个数据存储中的一堆联系人,每个联系人都有一个日期。在同一日期有一堆重复的联系人。我想要做的就是简化群体,收集与同一日期相同的联系人。



例如:

让我们说我有这个联系人:


  1. CONTACT_NAME:Foo1 |日期:01-10-2012

  2. CONTACT_NAME:Foo2 |日期:02-05-2012

  3. CONTACT_NAME:Foo1 |日期:01-10-2012


  4. 所以在MapReduce操作之后它会是这样的:


    1. CONTACT_NAME:Foo1 |日期:01-10-2012

    2. CONTACT_NAME:Foo2 |日期:2012年2月2日



    3. 对于GROUP BY功能,我认为字数是可行的。






      编辑

      log是:
      $ b


      / mapreduce / pipeline / run 200



      运行GetContactData。 WordCountPipeline((u'2012-02-02',),
      *
      {})#da26a9b555e311e19b1e6d324d450c1a


      END EDIT

      如果我做错了什么,如果我使用错误的方法做GROUP BY使用MapReduce,帮助我如何做到这一点与MapReduce。






      这是我的代码:

       来自联系人进口联系人
      来自google.appengine.ext从google.appengine.ext.webapp进口webapp
      导入模板
      from google.appengine.ext.webapp.util从google.appengine.api导入run_wsgi_app
      从google.appengine.ext导入邮件
      .db从google.appengine.ext导入GqlQuery
      导入db

      $ b从google.appengine.api导入任务队列
      从google.appengine.api导入用户$ b从mapreduce.lib导入文件
      $从mapreduce导入文件
      从mapreduce导入base_handler
      导入mapreduce_pipeline $ b $从mapreduce导入操作作为op
      从mapreduce导入混洗器

      import simplejson,logging,re

      $ b $ class GetContactData(webapp.RequestHandler):

      #根据用户标识$ b $获取调用(自我):
      contactId = self.request.get('contactId')
      query_contacts = Contact.all()
      query_contacts.filter('contact_id =',int(contactId ))
      query_contacts.order(' - timestamp_')
      contact_data = []
      如果query_contacts!=无:
      用于query_contacts中的联系人:
      pipeline = WordCountPipeline( contact.date)
      管道.start()
      record = {contact_id:contact.contact_id,
      contact_name:contact.contact_name,
      contact_number:contact.contact_number,
      timestamp :contact.timestamp_,
      current_time:contact.current_time_,
      type:contact.type_,
      current_date:contact.date}
      contact_data.append(record )

      self.response.headers ['Content-Type'] ='application / json'
      self.response.out.write(simplejson.dumps(contact_data))

      class WordCountPipeline(base_handler.PipelineBase):
      运行Word计数演示的管道。

      参数:
      blobkey:以字符串形式处理的blobkey。应该是一个带有
      文本文件的zip压缩文件。


      def run(self,date):
      output = yield mapreduce_pipeline.MapreducePipeline(
      word_count,
      main.word_count_map ,
      main.word_count_reduce,
      mapreduce.input_readers.DatastoreInputReader,
      mapreduce.output_writers.BlobstoreOutputWriter,
      mapper_params = {
      date :date,
      },
      reducer_params = {
      mime_type:text / plain,
      },
      shards = 16)
      yield StoreOutput (WordCount,输出)

      类StoreOutput(base_handler.PipelineBase):
      将MapReduce作业结果存储在数据库中的管道。

      参数:
      mr_type:mapreduce作业运行的类型(例如WordCount,Index)
      encoded_key:与该作业的元数据相对应的DB键
      输出:存储作业输出的blobstore位置


      def run(self,mr_type,output):
      logging.info(输出)#here I应该在JSON中追加分组持续时间


      解决方案

      我基于在这个问题中提供的代码@autumngard,并进行了修改以适合我的目的,并且它工作正常。


      I'm looking for a way to make a GROUP BY operation in a query in datastore using MapReduce. AFAIK App Engine doesn't support GROUP BY itself in GQL and a good approach suggested by other developers is use MapReduce.

      I downloaded the source code and I'm studying the demo code, and I tryied to implement in my case. But I hadn't success. Here is how I tryied to do it. Maybe everything I did is wrong. So if anyone could help me to do that, I would thank.


      What I want to do is: I have a bunch of contacts in the datastore, and each contact have a date. There are a bunch of repeated contacts with the same date. What I want to do is simple the group by, gather the same contacts with the same date.

      E.g:

      Let's say I have this contacts:

      1. CONTACT_NAME: Foo1 | DATE: 01-10-2012
      2. CONTACT_NAME: Foo2 | DATE: 02-05-2012
      3. CONTACT_NAME: Foo1 | DATE: 01-10-2012

      So after the MapReduce operation It would be something like this:

      1. CONTACT_NAME: Foo1 | DATE: 01-10-2012
      2. CONTACT_NAME: Foo2 | DATE: 02-05-2012

      For a GROUP BY functionality I think word count does the work.


      EDIT

      The only thing that is shown in the log is:

      /mapreduce/pipeline/run 200

      Running GetContactData.WordCountPipeline((u'2012-02-02',), *{})#da26a9b555e311e19b1e6d324d450c1a

      END EDIT

      If I'm doing something wrong, and if I'm using a wrong approach to do a GROUP BY with MapReduce, help me in how to do that with MapReduce.


      Here is my code:

      from Contacts import Contacts
      from google.appengine.ext import webapp
      from google.appengine.ext.webapp import template
      from google.appengine.ext.webapp.util import run_wsgi_app
      from google.appengine.api import mail
      from google.appengine.ext.db import GqlQuery
      from google.appengine.ext import db
      
      
      from google.appengine.api import taskqueue
      from google.appengine.api import users
      
      from mapreduce.lib import files
      from mapreduce import base_handler
      from mapreduce import mapreduce_pipeline
      from mapreduce import operation as op
      from mapreduce import shuffler
      
      import simplejson, logging, re
      
      
      class GetContactData(webapp.RequestHandler):
      
          # Get the calls based on the user id
          def get(self):
              contactId = self.request.get('contactId')
              query_contacts = Contact.all()
              query_contacts.filter('contact_id =', int(contactId))
              query_contacts.order('-timestamp_')
              contact_data = []
              if query_contacts != None:
                  for contact in query_contacts:
                          pipeline = WordCountPipeline(contact.date)
                          pipeline.start()
                          record = { "contact_id":contact.contact_id,
                                     "contact_name":contact.contact_name,
                                     "contact_number":contact.contact_number,
                                     "timestamp":contact.timestamp_,
                                     "current_time":contact.current_time_,
                                     "type":contact.type_,
                                     "current_date":contact.date }
                          contact_data.append(record)
      
              self.response.headers['Content-Type'] = 'application/json'
              self.response.out.write(simplejson.dumps(contact_data)) 
      
      class WordCountPipeline(base_handler.PipelineBase):
        """A pipeline to run Word count demo.
      
        Args:
          blobkey: blobkey to process as string. Should be a zip archive with
            text files inside.
        """
      
        def run(self, date):
          output = yield mapreduce_pipeline.MapreducePipeline(
              "word_count",
              "main.word_count_map",
              "main.word_count_reduce",
              "mapreduce.input_readers.DatastoreInputReader",
              "mapreduce.output_writers.BlobstoreOutputWriter",
              mapper_params={
                  "date": date,
              },
              reducer_params={
                  "mime_type": "text/plain",
              },
              shards=16)
          yield StoreOutput("WordCount", output)
      
      class StoreOutput(base_handler.PipelineBase):
        """A pipeline to store the result of the MapReduce job in the database.
      
        Args:
          mr_type: the type of mapreduce job run (e.g., WordCount, Index)
          encoded_key: the DB key corresponding to the metadata of this job
          output: the blobstore location where the output of the job is stored
        """
      
        def run(self, mr_type, output):
            logging.info(output) # here I should append the grouped duration in JSON
      

      解决方案

      I based on the code @autumngard provided in this question and modified to fit my purpose and it worked.

      这篇关于在App Engine中使用MapReduce创建GROUP BY的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆