从Google App Engine应用程序运行Google Dataflow管道? [英] Running Google Dataflow pipeline from a Google App Engine app?

查看:138
本文介绍了从Google App Engine应用程序运行Google Dataflow管道?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用DataflowPipelineRunner创建数据流作业。我尝试了以下方案。


  1. 不指定任何machineType

  2. 使用g1小型机器

  3. 与n1-highmem-2

在上述所有场景中,Input是来自GCS的文件,是非常小的文件(KB大小),输出是Big Query表。



我在所有情况下都出现了内存不足错误。

我的编译代码大小为94mb。我只尝试字数统计的例子,它没有读取任何输入(它在作业开始之前失败)。请帮助我了解为什么我收到此错误。



注意:我正在使用appengine来开始这项工作。


$ b 注意::相同的代码适用于beta版本 0.4.150414 p

编辑1



按照答案中的建议尝试以下操作,


  1. 自动缩放切换到基本缩放 使用的机器类型 B2 提供256MB内存

完成这些配置后,解决了Java堆内存问题。但它试图将一个罐子上传到超过10Mb的交错位置,因此它失败。



它记录下面的异常

  com.google.api.client.http.HttpRequest execute:执行请求时抛出的异常
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException:The请求https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ超过10 MiB限制。
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
位于com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler $ Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
位于com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler $ Connection。 getInputStream(URLFetchServiceStreamHandler.java:422)
,位于com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler $ Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
,位于com.google.api.client。 http.javanet.NetHttpResponse。< init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com .google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
,位于com.google.api.client.googleapis.media.MediaHttpUploader.executeCur rentRequestWithoutGZip(MediaHttpUploader.java:545)
,位于com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
,位于com.google.api.client.googleapis。 media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
位于com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
位于com.google.api。 client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
,位于com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
com。 google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
,位于com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel $ UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
在java.util.concurrent.FutureTask.run(FutureTask.java:260)
在java.util.concurrent.ThreadPoolE xecutor.runWorker(ThreadPoolExecutor.java:1168)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:605)
在com.google.apphosting.runtime.ApiProxyImpl $ CurrentRequestThreadFactory $ 1 $ 1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl $ CurrentRequestThreadFactory $ 1.run(ApiProxyImpl.java: 1146)
在java.lang.Thread.run(Thread.java:745)
在com.google.apphosting.runtime.ApiProxyImpl $ CurrentRequestThreadFactory $ 2 $ 1.run(ApiProxyImpl.java:1195)

我试着直接上传jar文件 - appengine-api-1.0-sdk-1.9.20。 jar ,仍会尝试上传此jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar
,我不知道它是什么罐子。任何关于这个jar是什么的赞赏。



请帮我解决这个问题。

解决方案

简短的回答是,如果您在托管虚拟机上使用AppEngine 您不会遇到AppEngine沙盒限制(使用 F1时的OOM或B1实例类,执行时间限制问题,列入白名单的JRE类)。如果您真的想在App Engine沙箱中运行,那么您对Dataflow SDK的使用最符合AppEngine沙箱的限制。下面我将解释常见问题以及人们为符合AppEngine沙箱限制所做的工作。



Dataflow SDK需要一个AppEngine实例类,它具有足够的内存来执行用户应用程序构建管道,分级任何资源,并将作业描述发送到Dataflow服务。通常我们看到用户需要使用超过128MB的实例类内存不会看到OOM错误。

通常构建管道并将其提交给Dataflow服务通常需要不到几秒钟,如果您的应用程序所需的资源是已经上演。将JAR和任何其他资源上传到GCS可能需要60秒以上的时间。这可以通过事先将您的JAR预先升级到GCS来手动解决(数据流SDK将在检测到它们已经存在时再跳过它们)或使用任务队列以获得10分钟的限制(请注意,对于大型应用程序,10分钟可能不足以分配所有资源)。



最后,在AppEngine沙盒环境中,您和所有依赖项仅限于使用列入白名单的类,否则您将得到如下异常:

  java.lang.SecurityException:
java.lang.IllegalAccessException:YYY不允许在ZZZ
...

编辑1



我们在类路径中执行jars内容的散列并将它们上传到GCS修改后的文件名。 AppEngine使用自己的JAR运行沙盒环境, appengine-api-L4wtoWwoElWmstI1Ia93cg.jar appengine-api.jar ,它是沙盒环境添加的jar。您可以从我们的 PackageUtil#getUniqueContentName(...),我们只是在 .jar 之前追加 - $ HASH

我们正在努力解决您为什么看到 RequestPayloadToLarge 除外,目前建议您设置 filesToStage 选项并过滤掉这些罐子不需要执行你的数据流来解决你所面临的问题。您可以通过 DataflowPipelineRunner#detectClassPathResourcesToStage(...)


I am creating a dataflow job using DataflowPipelineRunner. I tried the following scenarios.

  1. Without specifying any machineType
  2. With g1 small machine
  3. with n1-highmem-2

In all the above scenarios, Input is a file from GCS which is very small file(KB size) and output is Big Query table.

I got Out of memory error in all the scenarios

The size of my compiled code is 94mb. I am trying only word count example and it did not read any input(It fails before the job starts). Please help me understand why i am getting this error.

Note: I am using appengine to start the job.

Note: The same code works with beta versoin 0.4.150414

EDIT 1

As per the suggestions in the answer tried the following,

  1. Switched from Automatic scaling to Basic Scaling.
  2. Used machine type B2 which provides 256MB memory

After these configuration, Java Heap Memory problem is solved. But it is trying to upload a jar into stagging location which is more than 10Mb, hence it fails.

It logs the following exception

com.google.api.client.http.HttpRequest execute: exception thrown while executing request
com.google.appengine.api.urlfetch.RequestPayloadTooLargeException: The request to https://www.googleapis.com/upload/storage/v1/b/pwccloudedw-stagging-bucket/o?name=appengine-api-L4wtoWwoElWmstI1Ia93cg.jar&uploadType=resumable&upload_id=AEnB2Uo6HCfw6Usa3aXlcOzg0g3RawrvuAxWuOUtQxwQdxoyA0cf22LKqno0Gu-hjKGLqXIo8MF2FHR63zTxrSmQ9Yk9HdCdZQ exceeded the 10 MiB limit.
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.convertApplicationException(URLFetchServiceImpl.java:157)
at com.google.appengine.api.urlfetch.URLFetchServiceImpl.fetch(URLFetchServiceImpl.java:45)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.fetchResponse(URLFetchServiceStreamHandler.java:543)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getInputStream(URLFetchServiceStreamHandler.java:422)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection.getResponseCode(URLFetchServiceStreamHandler.java:275)
at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:36)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
at java.lang.Thread.run(Thread.java:745)
at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

I tried directly uploading the jar file - appengine-api-1.0-sdk-1.9.20.jar, still it tries to upload this jar appengine-api-L4wtoWwoElWmstI1Ia93cg.jar. which i dont know what jar it is. Any idea on what this jar is appreciated.

Please help me to fix this issue.

解决方案

The short answer is that if you use AppEngine on a Managed VM you will not encounter the AppEngine sandbox limits (OOM when using a F1 or B1 instance class, execution time limit issues, whitelisted JRE classes). If you really want to run within the App Engine sandbox, then your use of the Dataflow SDK most conform to the limits of the AppEngine sandbox. Below I explain common issues and what people have done to conform to the AppEngine sandbox limits.

The Dataflow SDK requires an AppEngine instance class which has enough memory to execute the users application to construct the pipeline, stage any resources, and send the job description to the Dataflow service. Typically we have seen users require using an instance class with more than 128mb of memory to not see OOM errors.

Generally constructing a pipeline and submitting it to the Dataflow service typically takes less than a couple of seconds if the required resources for your application are already staged. Uploading your JARs and any other resources to GCS can take longer than 60 seconds. This can be solved manually by pre-staging your JARs to GCS beforehand (the Dataflow SDK will skip staging them again if it detects they are already there) or using a task queue to get a 10 minute limit (note that for large applications, 10 mins may not be enough to stage all your resources).

Finally, within the AppEngine sandbox environment, you and all your dependencies are limited to using only whitelisted classes within the JRE or you'll get an exception like:

java.lang.SecurityException:
  java.lang.IllegalAccessException: YYY is not allowed on ZZZ
  ...

EDIT 1

We perform a hash of the contents of the jars on the classpath and upload them to GCS with a modified filename. AppEngine runs a sandboxed environment with its own JARs, appengine-api-L4wtoWwoElWmstI1Ia93cg.jar refers to appengine-api.jar which is a jar that the sandboxed environment adds. You can see from our PackageUtil#getUniqueContentName(...) that we just append -$HASH before .jar.

We are working to solve why you are seeing the RequestPayloadToLarge excepton and it is currently recommended that you set the filesToStage option and filter out the jars not required to execute your Dataflow to get around the issue that you face. You can see how we build the files to stage with DataflowPipelineRunner#detectClassPathResourcesToStage(...).

这篇关于从Google App Engine应用程序运行Google Dataflow管道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆