使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka [英] Using the Beam Python SDK and PortableRunner to connect to Kafka with SSL
问题描述
我有以下代码用于使用 python beam sdk.我知道 ReadFromKafka
转换是在 java sdk 工具(docker 容器)中运行的,但我一直无法弄清楚如何制作 ssl.truststore.location
和 ssl.keystore.location
可在 sdk 工具的 docker 环境中访问.job_endpoint
参数指向 java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081
>
I have the code below for connecting to kafka using the python beam sdk. I know that the ReadFromKafka
transform is run in a java sdk harness (docker container) but I have not been able to figure out how to make ssl.truststore.location
and ssl.keystore.location
accesible inside the sdk harness' docker environment. The job_endpoint
argument is pointing to java -jar beam-runners-flink-1.10-job-server-2.27.0.jar --flink-master localhost:8081
pipeline_args.extend([
'--job_name=paul_test',
'--runner=PortableRunner',
'--sdk_location=container',
'--job_endpoint=localhost:8099',
'--streaming',
"--environment_type=DOCKER",
f"--sdk_harness_container_image_overrides=.*java.*,{my_beam_sdk_docker_image}:{my_beam_docker_tag}",
])
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline:
kafka = pipeline | ReadFromKafka(
consumer_config={
"bootstrap.servers": "bootstrap-server:17032",
"security.protocol": "SSL",
"ssl.truststore.location": "/opt/keys/client.truststore.jks", # how do I make this available to the Java SDK harness
"ssl.truststore.password": "password",
"ssl.keystore.type": "PKCS12",
"ssl.keystore.location": "/opt/keys/client.keystore.p12", # how do I make this available to the Java SDK harness
"ssl.keystore.password": "password",
"group.id": "group",
"basic.auth.credentials.source": "USER_INFO",
"schema.registry.basic.auth.user.info": "user:password"
},
topics=["topic"],
max_num_records=2,
# expansion_service="localhost:56938"
)
kafka | beam.Map(lambda x: print(x))
我尝试指定 image override 选项为 --sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest'
- 其中 beam_java_sdk:latest
是我基于的 docker 镜像apache/beam_java11_sdk:2.27.0
并在其 entrypoint.sh 中提取凭证.但是 Beam 似乎没有使用它,我明白了
I tried specifying the image override option as --sdk_harness_container_image_overrides='.*java.*,beam_java_sdk:latest'
- where beam_java_sdk:latest
is a docker image I based on apache/beam_java11_sdk:2.27.0
and that pulls the credetials in its entrypoint.sh. But Beam does not appear to use it, I see
INFO org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory - Still waiting for startup of environment apache/beam_java11_sdk:2.27.0 for worker id 1-1
在日志中.很快就会不可避免地紧随其后
in the logs. Which is soon inevitebly followed by
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to load SSL keystore /opt/keys/client.keystore.p12 of type PKCS12
总而言之,我的问题是,在 Apache Beam 中,是否可以从 python 梁 sdk 中使文件在 java sdk 工具 docker 容器中可用?如果是这样,怎么做?
非常感谢.
推荐答案
目前,没有直接的方法来实现这一点.正在进行讨论和跟踪问题以提供对这种扩展服务定制的支持(请参阅 此处,此处、BEAM-12538 和 BEAM-12539).这就是简短的回答.
Currently, there is no straightforward way to achieve this. There is ongoing discussion and tracking issues to provide support for this kind of expansion service customization (see here, here, BEAM-12538 and BEAM-12539). That is the short answer.
长答案是肯定的,你可以这样做.您必须复制并粘贴 ExpansionService.java 到您的代码库中并构建您的自定义扩展服务,您可以在其中指定默认环境 (DOCKER) 和默认环境配置(您的图像)此处.然后,您必须手动运行此扩展服务并使用 ReadFromKafka 的 expansion_service
参数指定其地址.
Long answer is yes, you can do that. You would have to copy &paste ExpansionService.java into your codebase and build your custom expansion service, where you specify default environment (DOCKER) and default environment config (your image) here. You then have to manually run this expansion service and specify its address using expansion_service
parameter of ReadFromKafka.
这篇关于使用 Beam Python SDK 和 PortableRunner 通过 SSL 连接到 Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!