使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka [英] Using the Beam Python SDK and PortableRunner to connect to Kafka with SSL

查看:20
本文介绍了使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下代码用于使用 python beam sdk.我知道 ReadFromKafka 转换是在 java sdk 工具(docker 容器)中运行的,但我一直无法弄清楚如何制作 ssl.truststore.locationssl.keystore.location 可在 sdk 工具的 docker 环境中访问.job_endpoint 参数指向 java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081

I have the code below for connecting to kafka using the python beam sdk. I know that the ReadFromKafka transform is run in a java sdk harness (docker container) but I have not been able to figure out how to make ssl.truststore.location and ssl.keystore.location accesible inside the sdk harness' docker environment. The job_endpoint argument is pointing to java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081

pipeline_args.extend([
    '--job_name=paul_test',
    '--runner=PortableRunner',
    '--sdk_location=container',
    '--job_endpoint=localhost:8099',
    '--streaming',
    "--environment_type=DOCKER",
    f"--sdk_harness_container_image_overrides=.*java.*,{my_beam_sdk_docker_image}:{my_beam_docker_tag}",
])

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline:
    kafka = pipeline | ReadFromKafka(
        consumer_config={
            "bootstrap.servers": "bootstrap-server:17032",
            "security.protocol": "SSL",
            "ssl.truststore.location": "/opt/keys/client.truststore.jks", # how do I make this available to the Java SDK harness 
            "ssl.truststore.password": "password",
            "ssl.keystore.type": "PKCS12",
            "ssl.keystore.location": "/opt/keys/client.keystore.p12", # how do I make this available to the Java SDK harness 
            "ssl.keystore.password": "password",
            "group.id": "group",
            "basic.auth.credentials.source": "USER_INFO",
            "schema.registry.basic.auth.user.info": "user:password"
        },
        topics=["topic"],
        max_num_records=2,
        # expansion_service="localhost:56938"
    )

    kafka | beam.Map(lambda x: print(x))

我尝试指定 image override 选项为 --sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest' - 其中 beam_java_sdk:latest 是我基于的 docker 镜像apache/beam_java11_sdk:2.27.0 并在其 entrypoint.sh 中提取凭证.但是 Beam 似乎没有使用它,我明白了

I tried specifying the image override option as --sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest' - where beam_java_sdk:latest is a docker image I based on apache/beam_java11_sdk:2.27.0 and that pulls the credetials in its entrypoint.sh. But Beam does not appear to use it, I see

INFO  org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory  - Still waiting for startup of environment apache/beam_java11_sdk:2.27.0 for worker id 1-1

在日志中.很快就会不可避免地紧随其后

in the logs. Which is soon inevitebly followed by

Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /opt/keys/client.keystore.p12 of type PKCS12

总而言之,我的问题是,在 Apache Beam 中,是否可以从 python 梁 sdk 中使文件在 java sdk 工具 docker 容器中可用?如果是这样,怎么做?

非常感谢.

推荐答案

目前,没有直接的方法来实现这一点.正在进行讨论和跟踪问题以提供对这种扩展服务定制的支持(请参阅 此处此处BEAM-12538BEAM-12539).这就是简短的回答.

Currently, there is no straightforward way to achieve this. There is ongoing discussion and tracking issues to provide support for this kind of expansion service customization (see here, here, BEAM-12538 and BEAM-12539). That is the short answer.

长答案是肯定的,你可以这样做.您必须复制并粘贴 ExpansionService.java 到您的代码库中并构建您的自定义扩展服务,您可以在其中指定默认环境 (DOCKER) 和默认环境配置(您的图像)此处.然后,您必须手动运行此扩展服务并使用 ReadFromKafka 的 expansion_service 参数指定其地址.

Long answer is yes, you can do that. You would have to copy &paste ExpansionService.java into your codebase and build your custom expansion service, where you specify default environment (DOCKER) and default environment config (your image) here. You then have to manually run this expansion service and specify its address using expansion_service parameter of ReadFromKafka.

这篇关于使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆