如何从Google App Engine应用程序运行Google Dataflow管道? [英] How do I run a Google Dataflow pipeline from a Google App Engine app?

查看:86
本文介绍了如何从Google App Engine应用程序运行Google Dataflow管道?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要定期运行数据流管道. 数据流常见问题解答指出以下内容:

I need to run a Dataflow pipeline on a regular basis. The FAQ for Dataflow states the following:

您可以通过Google App Engine或GCE上的自定义(CRON)作业流程来自动执行管道. SDK的未来版本将支持命令行选项,以更精细地控制作业管理.

You can automate pipeline execution through Google App Engine or custom (CRON) job processes on GCE. Future releases of the SDK will support command line options for finer grained control over job management.

我尝试使用以下代码从Java应用程序运行一个非常简单的管道:

I've tried to run a very simple pipeline from my Java app, using this code:

public class MyAnalyticsServlet extends HttpServlet {
    @Override
    public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
        resp.setContentType("text/plain");
        if (req.getRequestURI().equals("/dataflow/test")) {
            DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
            options.setProject("redacted");
            options.setRunner(DataflowPipelineRunner.class);
            Pipeline p = Pipeline.create(options);
            p.apply(TextIO.Read.named("TestInput").from("gs://redacted/test/in.txt"))
                    .apply(new TestTransform())
                    .apply(TextIO.Write.named("TestOutput")
                            .to("gs://redacted/test")
                            .withNumShards(0));
            p.run();
        } else {
            resp.setStatus(404);
            resp.getWriter().println("Not Found");
            return;
        }
        resp.getWriter().println("OK");
    }
}

我收到以下错误:

java.lang.IllegalArgumentException: Methods [setRunner(Class), getRunner()] on [com.google.cloud.dataflow.sdk.options.PipelineOptions] do not conform to being bean properties.
    at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
    at com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.validateClass(PipelineOptionsFactory.java:1059)
    ...

有什么想法吗?

推荐答案

我知道您正在使用Java.但是,此示例逐步介绍了如何从GAE Python Flex应用程序执行此操作,可能会有所帮助:

I know you're using Java; however, this example, which walks through how to do this from a GAE Python Flex app, might be helpful: http://amygdala.github.io/dataflow/app_engine/2017/04/14/gae_dataflow.html

这篇关于如何从Google App Engine应用程序运行Google Dataflow管道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆