GCP Dataflow 是否支持 python 中的 kafka IO? [英] Does GCP Dataflow support kafka IO in python?

查看:29
本文介绍了GCP Dataflow 是否支持 python 中的 kafka IO?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 python 代码中的 kafka.ReadFromKafka() 方法从 kafka 主题读取数据.我的代码如下所示:

I am trying to read data from kafka topic using kafka.ReadFromKafka() method in python code.My code looks like below:

from apache_beam.io.external import kafka
import apache_beam as beam

options = PipelineOptions()

with beam.Pipeline(options=options) as p:
           plants = (
      p
        |       'read' >> kafka.ReadFromKafka({'bootstrap.servers': 'public_ip:9092'}, ['topic1']))

但收到以下错误消息.

ERROR:apache_beam.runners.runner:访问读取回溯时出错(最近一次调用最后一次):文件test_file.py",第 16 行,在 中.|'阅读'>>kafka.ReadFromKafka({'bootstrap.servers': 'localhost:9092'}, ['topic1']) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py",行547,在 __exit__ self.run().wait_until_finish() 文件/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py",第 526 行,在运行中返回 self.runner.run_pipeline(self, self._options) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py",第 565 行,在 run_pipeline self.visit_transforms(pipeline, options) 文件中;/usr/local/lib/python3.7/dist-packages/apache_beam/runners/runner.py", line 224, in visit_transforms pipeline.visit(RunVisitor(self)) 文件 "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py",第 572 行,访问 self._root_transform().visit(visitor, self,visited) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py", line 1075, invisit part.visit(visitor, pipeline,visited) 文件 "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py",第 1078 行,在访问visitor.visit_transform(self) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/runners/runner.py",第 219 行,在 visit_transform self.runner.run_transform(transform_node, options) 文件/usr/local/lib/python3.7/dist-packages/apache_beam/runners/runner.py",第 249 行,在 run_transform(transform_node.transform, self)) NotImplementedError: 执行 [<ReadFromKafka(PTransform) label=[ReadFromKafka(beam:external:java:kafka:read:v1)]>] 没有在 runner <apache_beam.runners 中实现.dataflow.dataflow_runner.DataflowRunner 对象位于 0x7f72463344a8>.

是不是因为apache beam Dataflow runner不支持kafkaIO?

Is it because apache beam Dataflow runner doesn't support kafkaIO ?

推荐答案

beam 的 python SDK 确实支持连接到 Kafka.下面是代码片段

The python SDK for beam does support connecting to Kafka. Below is a code snippet

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

kafka_topic = "notifications"
kafka_config = {"topic": kafka_topic,
                "bootstrap_servers": "localhost:9092",
                "group_id": "notification_consumer_group"}

with beam.Pipeline(options=PipelineOptions()) as p:
    notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config)
    notifications | 'Writing to stdout' >> beam.Map(print)

bootstrap_servers 是一个逗号分隔的主机和端口配置,您的代理部署在其中.您将从您的 Kafka 集群配置中获取此信息.

The bootstrap_servers is a comma separated host and port configuration where your brokers are deployed. You will get this information from your Kafka cluster configuration.

这篇关于GCP Dataflow 是否支持 python 中的 kafka IO?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆