在Google App Engine中使用mapreduce的简单计数器示例 [英] Simple counter example using mapreduce in Google App Engine

查看:169
本文介绍了在Google App Engine中使用mapreduce的简单计数器示例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对GAE中mapreduce支持的当前状态有些困惑。根据文档 http://code.google.com/p/appengine-mapreduce/ 减少阶段isn目前尚未支持,但在2011年I / O会议的描述中( http://www.youtube.com/观看?v = EIxelKcyCC0 )写入现在可以在App Engine上运行完整的Map Reduce作业。我怀疑我是否可以在这个任务中使用mapreduce:

我想要做什么:



$ p

  class Car(db.Model):
color = db.StringProperty()

我想运行mapreduce过程(不时定义cron定义)它可以计算每种颜色中有多少辆汽车将这个结果存储在数据存储区中。看起来像一个非常适合mapreduce的工作(但如果我错了我的话),阶段map会为每个Car实体生成对(,1),阶段reduce应该通过color_name合并这个数据,给我预期的结果。我想获得的最终结果是存储在数据存储中的计算数据的实体,类似于:

  class CarsByColor(db.Model ):
color_name = db.StringProperty()
cars_num = db.IntegerProperty()




我不知道如何在appengine中实现这个视频...视频显示了具有已定义映射和缩减函数的示例,但它们似乎是非常一般的示例与数据存储相关。我发现的所有其他示例都使用一个函数处理来自DatastoreInputReader的数据,但它们似乎只是映射阶段,没有示例说明如何执行reduce(以及如何将减少的结果存储在数据存储)。

解决方案

我在这里提供的解决方案我最终使用GAE中的mapreduce(没有缩小阶段)。如果我从头开始,我可能会使用 Drew Sears 提供的解决方案。



它适用于GAE python 1.5.0



app.yaml I添加mapreduce处理程序:

   -  url:/mapreduce(/.*)? 
script:$ PYTHON_LIB / google / appengine / ext / mapreduce / main.py

和(我使用url / mapred_update来收集mapreduce生成的结果):

   -  url :/mapred_.* 
script:mapred.py

已创建 mapreduce.yaml
$ b

  mapreduce:
- name:Color_Counter
params:
- name:done_callback
值:/ mapred_update $ b $ mapper:
input_reader:google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
处理程序:mapred.process
params:
- name:entity_kind
default:models.Car

解释: done_callback 是mapreduce完成其操作后调用的网址。 mapred.process 是一个处理单个实体和更新计数器的函数(它在mapred.py文件中定义)。模型 Car 在models.py中定义
$ b mapred.py

 从模型导入CarsByColor $ b $从google.appengine.ext导入db 
从google.appengine.ext.mapreduce导入操作作为op
from google.appengine.ext.mapreduce.model从google.appengine.ext导入MapreduceState

从google.appengine.ext.webapp.util导入webapp
导入run_wsgi_app

def process(entity):
处理个别车辆
color = entity.color
如果颜色:
产生op.counters.Increment(' car_color_%s'%color)

class UpdateCounters(webapp.RequestHandler):
根据mapreduce计数器收集的数据
创建统计模型CarsByColor
def post(self):
mapreduce操作完成后调用
#完成的mapreduce作业ID在请求标头中传递
job_id = self.request.headers ['M apreduce -Id']
state = MapreduceState.get_by_job_id(job_id)
to_put = []
counters = state.counters_map.counters
#删除不需要的计数器
计数器['mapper_calls']
在counter.keys()中的计数器:
stat = CarsByColor.get_by_key_name(计数器)
如果不是stat:
stat = CarsByColor(key_name = counter,
name = counter)
stat.value = counters [counter]
to_put.append(stat)
db.put(to_put)

self.response.headers ['Content-Type'] ='text / plain'
self.response.out.write('Updated。')


application = webapp.WSGIApplication(
[('/ mapred_update',UpdateCounters)],
debug = True)
def main():
run_wsgi_app(应用程序)

if __ name__ ==__main__:
main()

CarsByColor模型相比问题。



您可以从url手动启动mapreduce作业: http:// yourapp / mapreduce / ,希望能从cron(我还没有测试过cron)。

I'm somewhat confused with the current state of mapreduce support in GAE. According to the docs http://code.google.com/p/appengine-mapreduce/ reduce phase isn't supported yet, but in the description of the session from I/O 2011 ( http://www.youtube.com/watch?v=EIxelKcyCC0 ) it's written "It is now possible to run full Map Reduce jobs on App Engine". I wonder if I can use mapreduce in this task:

What I want to do:

I have model Car with field color:

class Car(db.Model):
    color = db.StringProperty()

I want to run mapreduce process (from time to time, cron-defined) which can compute how many cars are in each color ans store this result in the datastore. Seems like a job well suited for mapreduce (but if I'm wrong correct me), phase "map" will yield pairs (, 1) for each Car entity, and phase "reduce" should merge this data by color_name giving me expected results. Final result I want to get are entities with computed data stored in the datastore, something like that:

class CarsByColor(db.Model):
    color_name = db.StringProperty()
    cars_num = db.IntegerProperty()

Problem: I don't know how to implement this in appengine ... The video shows examples with defined map and reduce functions, but they seem to be very general examples not related to the datastore. All other examples that i found are using one function to process the data from DatastoreInputReader, but they seem to be only the "map" phase, there is no example of how to do the "reduce" (and how to store reduce results in the datastore).

解决方案

I'm providing here solution I figured out eventually using mapreduce from GAE (without reduce phase). If I had started from scratch I probably would have used solution provided by Drew Sears.

It works in GAE python 1.5.0

In app.yaml I added the handler for mapreduce:

- url: /mapreduce(/.*)?
  script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py

and the handler for my code for mapreduce (I'm using url /mapred_update to gather the results produced by mapreduce):

- url: /mapred_.*
  script: mapred.py

Created mapreduce.yaml for processing Car entities:

mapreduce:
- name: Color_Counter
  params:
  - name: done_callback
    value: /mapred_update
  mapper:
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
    handler: mapred.process
    params:
    - name: entity_kind
      default: models.Car

Explanation: done_callback is an url that is called after mapreduce finishes its operations. mapred.process is a function that process individual entity and update counters (it's defined in mapred.py file). Model Car is defined in models.py

mapred.py:

from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState

from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app

def process(entity):
    """Process individual Car"""
    color = entity.color
    if color:
        yield op.counters.Increment('car_color_%s' % color)

class UpdateCounters(webapp.RequestHandler):
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters"""
    def post(self):
        """Called after mapreduce operation are finished"""
        # Finished mapreduce job id is passed in request headers
        job_id = self.request.headers['Mapreduce-Id']
        state = MapreduceState.get_by_job_id(job_id)
        to_put = []
        counters = state.counters_map.counters
        # Remove counter not needed for stats
        del counters['mapper_calls']
        for counter in counters.keys():
            stat = CarsByColor.get_by_key_name(counter)
            if not stat:
                stat = CarsByColor(key_name=counter,
                                name=counter)
            stat.value = counters[counter]
            to_put.append(stat)
        db.put(to_put)

        self.response.headers['Content-Type'] = 'text/plain'
        self.response.out.write('Updated.')


application = webapp.WSGIApplication(
                                     [('/mapred_update', UpdateCounters)],
                                     debug=True)
def main():
    run_wsgi_app(application)

if __name__ == "__main__":
    main()            

There is slightly changed definition of CarsByColor model compared to question.

You can start the mapreduce job manually from url: http://yourapp/mapreduce/ and hopefully from cron (I haven't tested the cron yet).

这篇关于在Google App Engine中使用mapreduce的简单计数器示例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆