本地发布订阅模拟器不适用于 Dataflow [英] Local Pubsub Emulator won't work with Dataflow
问题描述
我正在用 Java 开发 Dataflow,输入来自 Pubsub.后来,我在
线程main"org.apache.beam.sdk.Pipeline$PipelineExecutionException 中的异常:java.lang.RuntimeException:无法创建订阅:...引起:java.lang.RuntimeException:无法创建订阅:在 org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)...引起:java.lang.IllegalArgumentException:java.net.MalformedURLException:未知协议:localhost...引起:java.net.MalformedURLException:未知协议:本地主机
当我尝试在 options.setPubsubRootUrl("localhost:8085")
行中添加 http
时,我得到一个无限重复的异常:
com.google.api.client.http.HttpRequest 执行警告:执行请求时抛出异常java.net.ConnectException:连接被拒绝:连接在 java.net.DualStackPlainSocketImpl.waitForConnect(本机方法)在 java.net.DualStackPlainSocketImpl.socketConnect(来源不明)在 java.net.AbstractPlainSocketImpl.doConnect(来源不明)在 java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)在 java.net.AbstractPlainSocketImpl.connect(Unknown Source)在 java.net.PlainSocketImpl.connect(Unknown Source)在 java.net.SocksSocketImpl.connect(来源不明)
它似乎到达了 Pubsub 模拟器,但无法连接,因为我运行模拟器的命令行也会无限生成这个:
[pubsub] 2020 年 4 月 10 日下午 3:49:30 io.gapi.emulators.grpc.GrpcServer$3 operationComplete[pubsub] 信息:向新注册的频道添加处理程序.[pubsub] 2020 年 4 月 10 日下午 3:49:30 io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead[pubsub] 信息:检测到非 HTTP/2 连接.
如何让我的 Dataflow 与 Pubsub 模拟器一起工作?
您正在尝试使用 Beam 2.5 SDK 的 Dataflow fork 从 Beam Direct Runner 连接到 Pubsub 模拟器.自 2019 年 6 月 6 日起,Dataflow 2.5 SDK 和 Eclipse 插件已被弃用.不过这应该可以工作.
正如您所发现的,您需要在 Beam 中为您的 PubsubRootUrl 加上http://"前缀.您看到的第二个问题表明 localhost:8085
上没有任何内容在侦听.这可能是因为实际上有 2 个本地主机:IPv4 和 IPv6.Pubsub Emulator 仅侦听 IPv4,Windows 首先尝试 IPv6.尝试将 localhost
替换为 127.0.0.1
以强制使用 IPv4.你应该得到这样的结果:
options.setPubsubRootUrl("http://127.0.0.1:8085")
I am developing Dataflow in Java, the input comes from a Pubsub. Later, I saw a guide here on how to use local Pubsub emulator so I would not need to deploy to GCP in order to test.
Here is my simple code:
private interface Options extends PipelineOptions, PubsubOptions, StreamingOptions {
@Description("Pub/Sub topic to read messages from")
String getTopic();
void setTopic(String topic);
@Description("Pub/Sub subscription to read messages from")
String getSubscription();
void setSubscription(String subscription);
@Description("Local file output")
String getOutput();
void setOutput(String output);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(Options.class);
options.setStreaming(true);
options.setPubsubRootUrl("localhost:8085");
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
// other .apply's
pipeline.run();
}
I was able to follow the guide, including the part where I need to use the example Python code to create topic, subscription, publisher and even publish messages. When I use the Python code to interact with the Pubsub emulator, I notice the message Detected HTTP/2 connection
in the command-line where I run the emulator:
Executing: cmd /c C:\...\google-cloud-sdk\platform\pubsub-emulator\bin\cloud-pubsub-emulator.bat --host=localhost --port=8085
[pubsub] This is the Google Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
[pubsub] Apr 10, 2020 3:33:26 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: IAM integration is disabled. IAM policy methods and ACL checks are not supported
[pubsub] Apr 10, 2020 3:33:26 PM io.gapi.emulators.netty.NettyUtil applyJava7LongHostnameWorkaround
[pubsub] INFO: Unable to apply Java 7 long hostname workaround.
[pubsub] Apr 10, 2020 3:33:27 PM com.google.cloud.pubsub.testing.v1.Main main
[pubsub] INFO: Server started, listening on 8085
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:38 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:34:52 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected HTTP/2 connection.
I compiled/run the code in Eclipse using Dataflow Pipeline Run Configuration, but I get a problem.
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: Failed to create subscription:
...
Caused by: java.lang.RuntimeException: Failed to create subscription:
at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.createRandomSubscription(PubsubUnboundedSource.java:1427)
...
Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: unknown protocol: localhost
...
Caused by: java.net.MalformedURLException: unknown protocol: localhost
When I try to add http
in the line options.setPubsubRootUrl("localhost:8085")
, I get an infinitely repeated exception:
com.google.api.client.http.HttpRequest execute
WARNING: exception thrown while executing request
java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.net.PlainSocketImpl.connect(Unknown Source)
at java.net.SocksSocketImpl.connect(Unknown Source)
It seems to reach the Pubsub emulator but can't connect as the command-line where I run the emulator generates this infinitely also:
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] Apr 10, 2020 3:49:30 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.
How can I make my Dataflow work with Pubsub emulator?
You are attempting to connect to the Pubsub emulator from the Beam Direct Runner, using the Dataflow fork of the Beam 2.5 SDK. The Dataflow 2.5 SDK and Eclipse plugin were deprecated as of June 6, 2019. However this should work.
You need to prefix your PubsubRootUrl with 'http://' in Beam, as you've discovered. The second problem you are seeing indicates that nothing is listening on localhost:8085
. This is likely because there are actually 2 localhosts: IPv4 and IPv6. The Pubsub Emulator only listens on IPv4 and Windows tries IPv6 first. Try replacing localhost
with 127.0.0.1
to force IPv4. You should end up with this:
options.setPubsubRootUrl("http://127.0.0.1:8085")
这篇关于本地发布订阅模拟器不适用于 Dataflow的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!