在 Google App Engine 中使用 mapreduce 的简单反例 [英] Simple counter example using mapreduce in Google App Engine

查看:21
本文介绍了在 Google App Engine 中使用 mapreduce 的简单反例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 GAE 中 mapreduce 支持的当前状态有些困惑.根据文档 http://code.google.com/p/appengine-mapreduce/ 减少阶段是尚不支持,但在 I/O 2011 的会话描述中(http://www.youtube.com/watch?v=EIxelKcyCC0 ) 上面写着现在可以在 App Engine 上运行完整的 Map Reduce 作业".我想知道我是否可以在这个任务中使用 mapreduce:

I'm somewhat confused with the current state of mapreduce support in GAE. According to the docs http://code.google.com/p/appengine-mapreduce/ reduce phase isn't supported yet, but in the description of the session from I/O 2011 ( http://www.youtube.com/watch?v=EIxelKcyCC0 ) it's written "It is now possible to run full Map Reduce jobs on App Engine". I wonder if I can use mapreduce in this task:

我想做的事:

我有带有字段颜色的模型汽车:

I have model Car with field color:

class Car(db.Model):
    color = db.StringProperty()

我想运行 mapreduce 进程(不时,由 cron 定义),它可以计算每种颜色的汽车数量,并将结果存储在数据存储中.似乎是一项非常适合 mapreduce 的工作(但如果我错了,请纠正我),阶段map"将为每个 Car 实体生成对 (, 1),阶段reduce"应该通过 color_name 合并这些数据给我预期的结果.我想得到的最终结果是计算数据存储在数据存储区中的实体,如下所示:

I want to run mapreduce process (from time to time, cron-defined) which can compute how many cars are in each color ans store this result in the datastore. Seems like a job well suited for mapreduce (but if I'm wrong correct me), phase "map" will yield pairs (, 1) for each Car entity, and phase "reduce" should merge this data by color_name giving me expected results. Final result I want to get are entities with computed data stored in the datastore, something like that:

class CarsByColor(db.Model):
    color_name = db.StringProperty()
    cars_num = db.IntegerProperty()

问题:我不知道如何在 appengine 中实现这一点……视频显示了定义了 map 和 reduce 函数的示例,但它们似乎是与数据存储区无关的非常一般的示例.我发现的所有其他示例都使用一个函数来处理来自 DatastoreInputReader 的数据,但它们似乎只是映射"阶段,没有示例说明如何执行减少"(以及如何将减少结果存储在数据存储).

Problem: I don't know how to implement this in appengine ... The video shows examples with defined map and reduce functions, but they seem to be very general examples not related to the datastore. All other examples that i found are using one function to process the data from DatastoreInputReader, but they seem to be only the "map" phase, there is no example of how to do the "reduce" (and how to store reduce results in the datastore).

推荐答案

我在这里提供解决方案,我最终发现使用 GAE 中的 mapreduce(没有 reduce 阶段).如果我从头开始,我可能会使用 Drew Sears 提供的解决方案.

I'm providing here solution I figured out eventually using mapreduce from GAE (without reduce phase). If I had started from scratch I probably would have used solution provided by Drew Sears.

它适用于 GAE python 1.5.0

It works in GAE python 1.5.0

app.yaml 我添加了 mapreduce 的处理程序:

In app.yaml I added the handler for mapreduce:

- url: /mapreduce(/.*)?
  script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py

以及我的 mapreduce 代码的处理程序(我使用 url/mapred_update 来收集 mapreduce 产生的结果):

and the handler for my code for mapreduce (I'm using url /mapred_update to gather the results produced by mapreduce):

- url: /mapred_.*
  script: mapred.py

创建 ma​​preduce.yaml 用于处理 Car 实体:

Created mapreduce.yaml for processing Car entities:

mapreduce:
- name: Color_Counter
  params:
  - name: done_callback
    value: /mapred_update
  mapper:
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
    handler: mapred.process
    params:
    - name: entity_kind
      default: models.Car

解释:done_callback是一个url,在mapreduce完成操作后调用.ma​​pred.process 是一个处理单个实体和更新计数器的函数(它在 mapred.py 文件中定义).模型 Car 在 models.py

Explanation: done_callback is an url that is called after mapreduce finishes its operations. mapred.process is a function that process individual entity and update counters (it's defined in mapred.py file). Model Car is defined in models.py

ma​​pred.py:

from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState

from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app

def process(entity):
    """Process individual Car"""
    color = entity.color
    if color:
        yield op.counters.Increment('car_color_%s' % color)

class UpdateCounters(webapp.RequestHandler):
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters"""
    def post(self):
        """Called after mapreduce operation are finished"""
        # Finished mapreduce job id is passed in request headers
        job_id = self.request.headers['Mapreduce-Id']
        state = MapreduceState.get_by_job_id(job_id)
        to_put = []
        counters = state.counters_map.counters
        # Remove counter not needed for stats
        del counters['mapper_calls']
        for counter in counters.keys():
            stat = CarsByColor.get_by_key_name(counter)
            if not stat:
                stat = CarsByColor(key_name=counter,
                                name=counter)
            stat.value = counters[counter]
            to_put.append(stat)
        db.put(to_put)

        self.response.headers['Content-Type'] = 'text/plain'
        self.response.out.write('Updated.')


application = webapp.WSGIApplication(
                                     [('/mapred_update', UpdateCounters)],
                                     debug=True)
def main():
    run_wsgi_app(application)

if __name__ == "__main__":
    main()            

与问题相比,CarsByColor 模型的定义略有变化.

There is slightly changed definition of CarsByColor model compared to question.

您可以从 url 手动启动 mapreduce 作业:http://yourapp/mapreduce/ 并希望从 cron (我还没有测试 cron).

You can start the mapreduce job manually from url: http://yourapp/mapreduce/ and hopefully from cron (I haven't tested the cron yet).

这篇关于在 Google App Engine 中使用 mapreduce 的简单反例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆