用appengine-mapreduce达到内存限制 [英] Memory limit hit with appengine-mapreduce
问题描述
我正在研究appengine-mapreduce函数,并已修改演示以适合我的目的. 基本上,我有以下格式的一百万行:userid,time1,time2.我的目的是找到每个用户标识的time1和time2之间的差异.
I'm working on appengine-mapreduce function and have modified the demo to fit my purpose. Basically I have a million over lines in the following format: userid, time1, time2. My purpose is to find the difference between time1 and time2 for each userid.
但是,当我在Google App Engine上运行它时,我在日志部分遇到了此错误消息:
However, as I run this on Google App Engine, I encountered this error message in the logs section:
为130个请求总计提供服务后,超出了180.56 MB的软私有内存限制 在处理此请求时,发现处理此请求的进程使用了过多的内存并被终止.这很可能导致新流程用于您的应用程序的下一个请求.如果您经常看到此消息,则可能是应用程序内存泄漏.
Exceeded soft private memory limit with 180.56 MB after servicing 130 requests total While handling this request, the process that handled this request was found to be using too much memory and was terminated. This is likely to cause a new process to be used for the next request to your application. If you see this message frequently, you may have a memory leak in your application.
def time_count_map(data):
"""Time count map function."""
(entry, text_fn) = data
text = text_fn()
try:
q = text.split('\n')
for m in q:
reader = csv.reader([m.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
def time_count_reduce(key, values):
"""Time count reduce function."""
time = 0.0
for subtime in values:
time += float(subtime)
realtime = int(time)
yield "%s: %d\n" % (key, realtime)
有人可以建议我如何更好地优化我的代码吗?谢谢!
Can anyone suggest how else I can optimize my code better? Thanks!!
这是管道处理程序:
class TimeCountPipeline(base_handler.PipelineBase):
"""A pipeline to run Time count demo.
Args:
blobkey: blobkey to process as string. Should be a zip archive with
text files inside.
"""
def run(self, filekey, blobkey):
logging.debug("filename is %s" % filekey)
output = yield mapreduce_pipeline.MapreducePipeline(
"time_count",
"main.time_count_map",
"main.time_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=32)
yield StoreOutput("TimeCount", filekey, output)
Mapreduce.yaml:
Mapreduce.yaml:
mapreduce:
- name: Make messages lowercase
params:
- name: done_callback
value: /done
mapper:
handler: main.lower_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
- name: Make messages upper case
params:
- name: done_callback
value: /done
mapper:
handler: main.upper_case_posts
input_reader: mapreduce.input_readers.DatastoreInputReader
params:
- name: entity_kind
default: main.Post
- name: processing_rate
default: 100
- name: shard_count
default: 4
其余文件与演示完全相同.
The rest of the files are exactly the same as the demo.
我已经在保管箱上上传了我的代码的副本: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
I've uploaded a copy of my codes on dropbox: http://dl.dropbox.com/u/4288806/demo%20compressed%20fail%20memory.zip
推荐答案
您的输入文件可能超出了软内存的大小限制.对于大文件,请使用BlobstoreLineInputReader
或BlobstoreZipLineInputReader
.
It is likely your input file exceeds the soft memory limit in size. For big files use either BlobstoreLineInputReader
or BlobstoreZipLineInputReader
.
这些输入阅读器传递的内容与map
函数不同,它们在文件和文本行中传递start_position
.
These input readers pass something different to the map
function, they pass the start_position
in the file and the line of text.
您的map
函数可能类似于:
def time_count_map(data):
"""Time count map function."""
text = data[1]
try:
reader = csv.reader([text.replace('\0', '')], skipinitialspace=True)
for s in reader:
"""Calculate time elapsed"""
sdw = s[1]
start_date = time.strptime(sdw,"%m/%d/%y %I:%M:%S%p")
edw = s[2]
end_date = time.strptime(edw,"%m/%d/%y %I:%M:%S%p")
time_difference = time.mktime(end_date) - time.mktime(start_date)
yield (s[0], time_difference)
except IndexError, e:
logging.debug(e)
使用BlobstoreLineInputReader
将使作业运行得更快,因为它可以使用多个分片(最多256个),但这意味着您需要上传未压缩的文件,这可能会很麻烦.我通过将压缩文件上传到EC2 Windows服务器来处理它,然后从那里解压缩并上传,因为上游带宽很大.
Using BlobstoreLineInputReader
will allow the job to run much faster as it can use more than one shard, up to 256, but it means you need to upload your files uncompressed, which can be a pain. I handle it by uploading the compressed files to an EC2 windows server, then decompress and upload from there, since upstream bandwidth is so big.
这篇关于用appengine-mapreduce达到内存限制的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!