本地发布订阅模拟器不适用于 Dataflow [英] Local Pubsub Emulator won't work with Dataflow

查看:26
本文介绍了本地发布订阅模拟器不适用于 Dataflow的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在用 Java 开发 Dataflow,输入来自 Pubsub.后来,我在

线程main"org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:java.lang.RuntimeException:无法创建订阅:...引起:java.lang.RuntimeException:无法创建订阅:在 org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)...引起:java.lang.IllegalArgumentException:java.net.MalformedURLException:未知协议:localhost...引起:java.net.MalformedURLException:未知协议:本地主机

当我尝试在 options.setPubsubRootUrl("localhost:8085") 行中添加 http 时,我得到一个无限重复的异常:

com.google.api.client.http.HttpRequest 执行警告:执行请求时抛出异常java.net.ConnectException:连接被拒绝:连接在 java.net.DualStackPlainSocketImpl.waitForConnect(本机方法)在 java.net.DualStackPlainSocketImpl.socketConnect(来源不明)在 java.net.AbstractPlainSocketImpl.doConnect(来源不明)在 java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)在 java.net.AbstractPlainSocketImpl.connect(Unknown Source)在 java.net.PlainSocketImpl.connect(Unknown Source)在 java.net.SocksSocketImpl.connect(来源不明)

它似乎到达了 Pubsub 模拟器,但无法连接,因为我运行模拟器的命令行也会无限生成这个:

[pubsub] 2020 年 4 月 10 日下午 3:49:30 io.gapi.emulators.grpc.GrpcServer$3 operationComplete[pubsub] 信息:向新注册的频道添加处理程序.[pubsub] 2020 年 4 月 10 日下午 3:49:30 io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead[pubsub] 信息:检测到非 HTTP/2 连接.

如何让我的 Dataflow 与 Pubsub 模拟器一起工作?

解决方案

您正在尝试使用 Beam 2.5 SDK 的 Dataflow fork 从 Beam Direct Runner 连接到 Pubsub 模拟器.自 2019 年 6 月 6 日起,Dataflow 2.5 SDK 和 Eclipse 插件已被弃用.不过这应该可以工作.

正如您所发现的,您需要在 Beam 中为您的 PubsubRootUrl 加上http://"前缀.您看到的第二个问题表明 localhost:8085 上没有任何内容在侦听.这可能是因为实际上有 2 个本地主机:IPv4 和 IPv6.Pubsub Emulator 仅侦听 IPv4,Windows 首先尝试 IPv6.尝试将 localhost 替换为 127.0.0.1 以强制使用 IPv4.你应该得到这样的结果:

options.setPubsubRootUrl("http://127.0.0.1:8085")

I am developing Dataflow in Java, the input comes from a Pubsub. Later, I saw a guide here on how to use local Pubsub emulator so I would not need to deploy to GCP in order to test.

Here is my simple code:

private interface Options extends PipelineOptions, PubsubOptions, StreamingOptions {

    @Description("Pub/Sub topic to read messages from")
    String getTopic();
    void setTopic(String topic);

    @Description("Pub/Sub subscription to read messages from")
    String getSubscription();
    void setSubscription(String subscription);

    @Description("Local file output")
    String getOutput();
    void setOutput(String output);
}

public static void main(String[] args) {

    Options options = PipelineOptionsFactory
            .fromArgs(args)
            .withValidation()
            .as(Options.class);
    options.setStreaming(true);
    options.setPubsubRootUrl("localhost:8085");

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        // other .apply's

    pipeline.run();

}

I was able to follow the guide, including the part where I need to use the example Python code to create topic, subscription, publisher and even publish messages. When I use the Python code to interact with the Pubsub emulator, I notice the message Detected HTTP/2 connection in the command-line where I run the emulator:

Executing: cmd /c C:\...\google-cloud-sdk\platform\pubsub-emulator\bin\cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Apr 10, 2020 3:33:26 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] Apr 10, 2020 3:33:26 PM io.gapi.emulators.netty.NettyUtil applyJava7LongHostnameWorkaround
[pubsub] INFO: Unable to apply Java 7 long hostname workaround.
[pubsub] Apr 10, 2020 3:33:27 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.

I compiled/run the code in Eclipse using Dataflow Pipeline Run Configuration, but I get a problem.

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create subscription: 
...
Caused by: java.lang.RuntimeException: Failed to create subscription: 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)
...
Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: unknown protocol: localhost
...
Caused by: java.net.MalformedURLException: unknown protocol: localhost

When I try to add http in the line options.setPubsubRootUrl("localhost:8085"), I get an infinitely repeated exception:

com.google.api.client.http.HttpRequest execute
WARNING: exception thrown while executing request
java.net.ConnectException: Connection refused: connect
    at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
    at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
    at java.net.PlainSocketImpl.connect(Unknown Source)
    at java.net.SocksSocketImpl.connect(Unknown Source)

It seems to reach the Pubsub emulator but can't connect as the command-line where I run the emulator generates this infinitely also:

[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.

How can I make my Dataflow work with Pubsub emulator?

解决方案

You are attempting to connect to the Pubsub emulator from the Beam Direct Runner, using the Dataflow fork of the Beam 2.5 SDK. The Dataflow 2.5 SDK and Eclipse plugin were deprecated as of June 6, 2019. However this should work.

You need to prefix your PubsubRootUrl with 'http://' in Beam, as you've discovered. The second problem you are seeing indicates that nothing is listening on localhost:8085. This is likely because there are actually 2 localhosts: IPv4 and IPv6. The Pubsub Emulator only listens on IPv4 and Windows tries IPv6 first. Try replacing localhost with 127.0.0.1 to force IPv4. You should end up with this:

options.setPubsubRootUrl("http://127.0.0.1:8085")

这篇关于本地发布订阅模拟器不适用于 Dataflow的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆