使用 Beam-nuggets 库部署管道时出现 GCP Dataflow 运行器错误 - “无法读取 data_plane 中的输入". [英] GCP Dataflow runner error when deploying pipeline using beam-nuggets library - "Failed to read inputs in the data_plane."

查看:12
本文介绍了使用 Beam-nuggets 库部署管道时出现 GCP Dataflow 运行器错误 - “无法读取 data_plane 中的输入".的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在使用 Kafka 实例作为输入和 Bigquery 作为输出在 GCP 提供的 Apache Beam Notebook 中测试 Apache Beam 管道.我已经能够通过 Interactive runner 成功使用管道,但是当我将相同的管道部署到 Dataflow runner 时,它似乎从未真正从已定义的 Kafka 主题中读取.查看日志给了我错误:

I have been testing an Apache beam pipeline within Apache beam notebooks provided by GCP using a Kafka instance as a input and Bigquery as output. I have been able to successfully use the pipeline via Interactive runner, but when I deploy the same pipeline to Dataflow runner it seems to never actually read from the Kafka topic that has been defined. Looking into the logs gives me the error:

无法读取数据平面中的输入.回溯(最近一次调用最后):文件/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py,

Failed to read inputs in the data plane. Traceback (most recent call last): File /usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py,

基于这篇文章的实施此处

有什么想法吗?下面提供的代码:

Any ideas? Code provided below:

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

kafka_config = {"topic": kafka_topic, "bootstrap_servers": ip_addr}

# p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options) # <- use for test
p = beam.Pipeline(DataflowRunner(), options=options) # <- use for dataflow implementation

notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config) 
preprocess = notifications | "Pre-process for model" >> beam.ParDo(preprocess()) 
model = preprocess | "format & predict" >> beam.ParDo(model())

newWrite = model | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)

来自日志的错误消息:

Failed to read inputs in the data plane. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", line 528, in _read_inputs for elements in elements_iterator: File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__ return self._next() File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 689, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1595595923.509682344","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1595595923.509650517","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1595595923.509649070","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1595595923.509645878","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >

还有

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1594205651.745381243","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1594205651.745371624","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1594205651.745370349","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1594205651.745367499","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >

管道设置:

Python sdk harness started with pipeline_options: {'streaming': True, 'project': 'example-project', 'job_name': 'beamapp-root-0727105627-001796', 'staging_location': 'example-staging-location', 'temp_location': 'example-staging-location', 'region': 'europe-west1', 'labels': ['goog-dataflow-notebook=2_23_0_dev'], 'subnetwork': 'example-subnetwork', 'experiments': ['use_fastavro', 'use_multiple_sdk_containers'], 'setup_file': '/root/notebook/workspace/setup.py', 'sdk_location': '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-2.23.0.dev0.tar.gz', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'job_port': '0', 'artifact_port': '0', 'expansion_port': '0'}

推荐答案

这在我的实施计划中似乎是不可能的,但使用多语言管道似乎更可行.我就此事向谷歌支持开了一张票,经过一段时间调查后得到以下答复:

Seems as if this isn't possible in my implementation plan, but with multi language pipelines it appears to be more viable. I opened a ticket with google support on this matter and got the following reply after some time investigating:

……此时 Python 没有任何可以使用的 KafkaIO数据流运行器.您可以使用 Java 作为解决方法.如果你需要Python 用于特定的东西(TensorFlow 或类似的),一个可能是将消息从 Kafka 发送到 PubSub 主题(通过另一个只从 Kafka 读取并发布到 PS 或外部应用程序)."

"… at this moment Python doesn't have any KafkaIO that works with DataflowRunner. You can use Java as a workaround. In case you need Python for something in particular (TensorFlow or similar), a possibility is to send the message from Kafka to a PubSub topic (via another pipeline that only reads from Kafka and publish to PS or an external application)."

所以请随时接受他们的建议,或者你们可以一起破解一些东西.我刚刚修改了我的架构以使用 pubsub 而不是 kafka.

So feel free to take their advice, or you might be able to hack something together. I just revised my architecture to use pubsub instead of kafka.

这篇关于使用 Beam-nuggets 库部署管道时出现 GCP Dataflow 运行器错误 - “无法读取 data_plane 中的输入".的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆