来自App Engine的管道提交 [英] Pipeline Submission from App Engine

查看:93
本文介绍了来自App Engine的管道提交的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有要求将数据存储实体发送到BigQuery表,同时进行数据转换。
我的设计到目前为止如下:

AppEngine Java应用程序将数据发布到PUB / SUB服务中的一个主题 - 得到了工作。
然后让DataflowPipeline订阅该主题并阅读该消息。然后完成转换并将结果写入BigQuery。我有一些示例代码正在运行,以测试它。



我的本​​地开发机器上有一个粗略的Pipeline,我可以运行它们 - 都是以演示代码的形式工作。这是通过
mvn appengine:devserver



本地运行现在的问题是:如何部署Google App Engine中的数据流管道?开发机器无法访问生产环境,所以我无法让我的Pipeline在Google Pipeline Service上运行。
我试图从Google App Engine提交,但收到内存不足错误。这似乎与一些身份验证问题有关。从StackOverflow的其他帖子看来,似乎这个来自App Engine的部署不是官方支持的。


在生产环境中如何做到这一点?



到目前为止的环境依赖关系:

maven 3.3.0

Google AppEngine 1.9.28

Google API客户端1.20.0

Java 1.7.0_79

工作站 - Windows 7

Google开发环境:Gold Package

这是我的示例代码,让管道进程运行....

  DataflowPipelineOptions options = PipelineOptionsFactory.create()。as(DataflowPipelineOptions.class); 
options.setNumWorkers(2);
options.setRunner(DataflowPipelineRunner.class);
options.setStagingLocation(gs:// pipeline_bucket2);
options.setProject(projectname);
options.setJobName(starterpipeline);
options.setUpdate(true);

管道p = Pipeline.create(选项); (新的DoFn< String,String>(){

p.apply(Create.of(Hello,World))。 b $ b @Override
public void processElement(ProcessContext c){
c.output(c.element()。toUpperCase());
}
}))。apply (ParDo.named(StringLogger)。(new DoFn< String,Void>(){
@Override
public void processElement(ProcessContext c){
LOG.info(c。 element());
}
}));

p.run();

这是我在尝试运行上面的代码时出现错误的堆栈跟踪:

 从servlet未捕获的异常
java.lang.OutOfMemoryError:Java堆空间$ b $ java.util.Arrays.copyOf(Arrays在Java中使用
。 io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler $ Connection $ BufferingOutputStream.write(URLFetchServiceStreamHandler.java:586)
at com。 google.api.client.util.ByteStreams.copy(ByteStreams.java:55)
位于com.google.api.client.util.IOUtils.copy(IOUtils.java:94)
位于com。 google.api.client.http.AbstractInputStreamContent.writeTo(AbstractInputStreamContent.java:72)
,位于com.google.api.client.http.javanet.NetHttpRequest.ex ecute(NetHttpRequest.java:79)
,位于com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
位于com.google.api.client.googleapis.media。 MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
,位于com.google.api.client.business.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
。 googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
,位于com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
,位于com.google。 api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
,位于com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
。 $ java.util.concurrent.FutureTask.run上的
(FutureTask.java:260)$ java.util.concurrent上的

at java.util.concurrent。
at java.util.concurrent.FutureTask.run(FutureTask.java:260)
at java.util.concurrent。 ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:605)
在com.google.apphosting.runtime.ApiProxyImpl $ CurrentRequestThreadFactory $ 1 $ 1.run(ApiProxyImpl.java:1152)
at java.security.AccessController.doPrivileged(Native Method)
at com.google.apphosting.runtime.ApiProxyImpl $ CurrentRequestThreadFactory $ 1.run(ApiProxyImpl.java: 1146)
在java.lang.Thread.run(Thread.java:745)
在com.google.apphosting.runtime.ApiProxyImpl $ CurrentRequestThreadFactory $ 2 $ 1.run(ApiProxyImpl.java:1195)


解决方案

数据流在上传时与Google云端存储通信时使用64MB缓冲区你的应用ons文物。如果您使用的实例没有足够的内存,则可能导致OOM,例如,如果您使用 AppEngine实例,内存为128mbs。

另外请注意,无论何时更新模块或AppEngine,都会执行第一次Dataflow管道内部更新,Dataflow SDK需要上传所有更改为Google Cloud Storage的应用程序工件。根据应用程序大小的不同,这可能会超过60秒,这是前端实例请求的限制,并且可能导致截止日期超过错误


I have a requirement to send Datastore Entities to a BigQuery table while at the same time doing transformation of the data. My design so far as follows:
AppEngine Java application publishes data to a Topic in the PUB/SUB service - got that working. Then have a DataflowPipeline subscribe to the topic and read the message. The transform is then done and result written to BigQuery. I have some sample code running to test this.

I have a crude Pipeline working from my local development machine which I can run - all working as demo code. This is run locally via mvn appengine:devserver

The question is now: How do you deploy the Dataflow Pipeline from Google App Engine? The development machine does not have access to the production environment, so I cannot get my Pipeline running on the Google Pipeline Service. I have tried to submit this from Google App Engine, but received out of memory errors. This seems related to some authentication problem. From other post here on StackOverflow it seems as if this "deploy" from App Engine is not supported "officially".

How would one do this in production environment then?

Environment dependancies so far:
maven 3.3.0
Google AppEngine 1.9.28
Google API client 1.20.0
Java 1.7.0_79
Workstation - Windows 7
Google Development Environment : Gold Package
This is my sample code to get the pipeline process running....

DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        options.setNumWorkers(2);
        options.setRunner(DataflowPipelineRunner.class);
        options.setStagingLocation("gs://pipeline_bucket2");
        options.setProject("projectname");
        options.setJobName("starterpipeline");
        options.setUpdate(true);

        Pipeline p = Pipeline.create(options);

        p.apply(Create.of("Hello", "World")).apply(ParDo.named("StringExtract").of(new DoFn<String, String>() {
            @Override
            public void processElement(ProcessContext c) {
                c.output(c.element().toUpperCase());
            }
        })).apply(ParDo.named("StringLogger").of(new DoFn<String, Void>() {
            @Override
            public void processElement(ProcessContext c) {
                LOG.info(c.element());
            }
        }));

        p.run();

This is my stack trace for the error when trying to run the code above:

Uncaught exception from servlet
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at com.google.apphosting.utils.security.urlfetch.URLFetchServiceStreamHandler$Connection$BufferingOutputStream.write(URLFetchServiceStreamHandler.java:586)
    at com.google.api.client.util.ByteStreams.copy(ByteStreams.java:55)
    at com.google.api.client.util.IOUtils.copy(IOUtils.java:94)
    at com.google.api.client.http.AbstractInputStreamContent.writeTo(AbstractInputStreamContent.java:72)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:79)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequestWithoutGZip(MediaHttpUploader.java:545)
    at com.google.api.client.googleapis.media.MediaHttpUploader.executeCurrentRequest(MediaHttpUploader.java:562)
    at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:419)
    at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357)
    at java.util.concurrent.FutureTask.run(FutureTask.java:260)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1168)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:605)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1$1.run(ApiProxyImpl.java:1152)
    at java.security.AccessController.doPrivileged(Native Method)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$1.run(ApiProxyImpl.java:1146)
    at java.lang.Thread.run(Thread.java:745)
    at com.google.apphosting.runtime.ApiProxyImpl$CurrentRequestThreadFactory$2$1.run(ApiProxyImpl.java:1195)

解决方案

Dataflow uses a 64mb buffer when communicating with Google Cloud Storage when uploading your applications artifacts. The OOM can be caused if the instance your using doesn't have enough memory, for example if your using an AppEngine instance with 128mbs of memory.

Also note that the first time your Dataflow pipeline is executed whenever you update the module or AppEngine does an internal update, the Dataflow SDK needs to upload all the application artifacts that changed to Google Cloud Storage. Depending on the application size, this can take more than 60s which is the limit for a frontend instance request and can cause deadline exceeded errors.

这篇关于来自App Engine的管道提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆