用appengine-mapreduce达到内存限制 [英] Memory limit hit with appengine-mapreduce

查看:92
本文介绍了用appengine-mapreduce达到内存限制的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究appengine-mapreduce函数,并已修改演示以适合我的目的. 基本上,我有以下格式的一百万行:userid,time1,time2.我的目的是找到每个用户标识的time1和time2之间的差异.

I'm working on appengine-mapreduce function and have modified the demo to fit my purpose. Basically I have a million over lines in the following format: userid, time1, time2. My purpose is to find the difference between time1 and time2 for each userid.

但是,当我在Google App Engine上运行它时,我在日志部分遇到了此错误消息:

However, as I run this on Google App Engine, I encountered this error message in the logs section:

为130个请求总计提供服务后,超出了180.56 MB的软私有内存限制 在处理此请求时,发现处理此请求的进程使用了​​过多的内存并被终止.这很可能导致新流程用于您的应用程序的下一个请求.如果您经常看到此消息,则可能是应用程序内存泄漏.

Exceeded soft private memory limit with 180.56 MB after servicing 130 requests total While handling this request, the process that handled this request was found to be using too much memory and was terminated. This is likely to cause a new process to be used for the next request to your application. If you see this message frequently, you may have a memory leak in your application.

def time_count_map(data):
  """Time count map function."""
  (entry, text_fn) = data
  text = text_fn()

  try:
    q = text.split('\n')
    for m in q:
        reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
  except IndexError, e:
    logging.debug(e)


def time_count_reduce(key, values):
  """Time count reduce function."""
  time = 0.0
  for subtime in values:
    time += float(subtime)
    realtime = int(time)
  yield "%s: %d\n" % (key, realtime)

有人可以建议我如何更好地优化我的代码吗?谢谢!

Can anyone suggest how else I can optimize my code better? Thanks!!

这是管道处理程序:

class TimeCountPipeline(base_handler.PipelineBase):
  """A pipeline to run Time count demo.

  Args:
    blobkey: blobkey to process as string. Should be a zip archive with
      text files inside.
  """

  def run(self, filekey, blobkey):
    logging.debug("filename is %s" % filekey)
    output = yield mapreduce_pipeline.MapreducePipeline(
        "time_count",
        "main.time_count_map",
        "main.time_count_reduce",
        "mapreduce.input_readers.BlobstoreZipInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_key": blobkey,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=32)
    yield StoreOutput("TimeCount", filekey, output)

Mapreduce.yaml:

Mapreduce.yaml:

mapreduce:
- name: Make messages lowercase
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.lower_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4
- name: Make messages upper case
  params:
  - name: done_callback
    value: /done
  mapper:
    handler: main.upper_case_posts
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: main.Post
    - name: processing_rate
      default: 100
    - name: shard_count
      default: 4

其余文件与演示完全相同.

The rest of the files are exactly the same as the demo.

我已经在保管箱上上传了我的代码的副本: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

I've uploaded a copy of my codes on dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip

推荐答案

您的输入文件可能超出了软内存的大小限制.对于大文件,请使用BlobstoreLineInputReaderBlobstoreZipLineInputReader.

It is likely your input file exceeds the soft memory limit in size. For big files use either BlobstoreLineInputReader or BlobstoreZipLineInputReader.

这些输入阅读器传递的内容与map函数不同,它们在文件和文本行中传递start_position.

These input readers pass something different to the map function, they pass the start_position in the file and the line of text.

您的map函数可能类似于:

def time_count_map(data):
    """Time count map function."""
    text = data[1]

    try:
        reader = csv.reader([text.replace('\0', '')], skipinitialspace=True)
        for s in reader:
            """Calculate time elapsed"""
            sdw = s[1]
            start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
            edw = s[2]
            end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
            time_difference = time.mktime(end_date) - time.mktime(start_date)
            yield (s[0], time_difference)
    except IndexError, e:
        logging.debug(e)

使用BlobstoreLineInputReader将使作业运行得更快,因为它可以使用多个分片(最多256个),但这意味着您需要上传未压缩的文件,这可能会很麻烦.我通过将压缩文件上传到EC2 Windows服务器来处理它,然后从那里解压缩并上传,因为上游带宽很大.

Using BlobstoreLineInputReader will allow the job to run much faster as it can use more than one shard, up to 256, but it means you need to upload your files uncompressed, which can be a pain. I handle it by uploading the compressed files to an EC2 windows server, then decompress and upload from there, since upstream bandwidth is so big.

这篇关于用appengine-mapreduce达到内存限制的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆