带有 JdbcIO 的 Apache Beam 管道 [英] Apache Beam pipeline with JdbcIO

查看:35
本文介绍了带有 JdbcIO 的 Apache Beam 管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Apache Beam 管道,它在从 BigQuery 读取数据后尝试写入 Postgres.该代码使用 JdbcIO 连接器和 Dataflow 运行器.我使用的是 Python 3.8.7 和 Apache Beam 2.28.0

I have an Apache Beam pipeline which tries to write to Postgres after reading from BigQuery. The code uses JdbcIO connector and Dataflow runner. I am using Python 3.8.7 and Apache Beam 2.28.0

我使用的是默认扩展服务.我也尝试运行自定义扩展服务,但仍然遇到相同的错误.有什么想法吗?

I was using default expansion service. I also tried to run a custom expansion service but still got the same error. Any idea ?

代码如下

def export_to_postgres(user_options, pipeline_options, password):
    """Creates a pipeline that writes entities to postgres."""

    TeacherRow = NamedTuple(
        "TeacherRow",
        [
            ("teacher_id", str),
            ("first_name", str),
            ("last_name", str),
            ("total_all_publisher", int)
        ])

    coders.registry.register_coder(TeacherRow, coders.RowCoder)

    p = beam.Pipeline(options=pipeline_options)

    (p
     | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
                query=user_options.query_src,
                use_standard_sql=True
            )
     | beam.Map(lambda x:
                TeacherRow(teacher_id=str(x.teacher_id),
                              first_name=str(x.first_name),
                              last_name=str(x.last_name),
                              total_all_publisher=int(x.total_all_publisher)))
     .with_output_types(TeacherRow)
     | beam.WindowInto(beam.window.FixedWindows(10))
     .with_output_types(TeacherRow)
     | 'Write to jdbc' >> WriteToJdbc(
                table_name="teacher",
                driver_class_name='org.postgresql.Driver',
                jdbc_url='jdbc:{}://{}:{}/{}'.format("postgresql", "your ip address", "5432", "postgres"),
                username="postgres",
                password="password"
            )
     )
    p.run()

我收到以下错误

  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/Users/trex/workspace/workflow/dataflow/bq-to-pg.py", line 102, in <module>
    run()
  File "/Users/Trex/workspace/workflow/dataflow/bq-to-pg.py", line 97, in run
    export_to_postgres(user_options, pipeline_options, password)
  File "/Users/trex/workspace/workflow/dataflow/bq-to-pg.py", line 58, in export_to_postgres
    p.run()
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 529, in run
    return Pipeline.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 904, in from_runner_api
    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1259, in from_runner_api
    part = context.transforms.get_by_id(transform_id)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/runners/pipeline_context.py", line 115, in get_by_id
    self._id_to_obj[id] = self._obj_type.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/pipeline.py", line 1236, in from_runner_api
    transform = ptransform.PTransform.from_runner_api(proto, context)
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/ptransform.py", line 700, in from_runner_api
    return constructor(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1419, in from_runner_api_parameter
    DoFnInfo.from_runner_api(
  File "/Users/trex/.pyenv/versions/3.8.7/lib/python3.8/site-packages/apache_beam/transforms/core.py", line 1493, in from_runner_api
    raise ValueError('Unexpected DoFn type: %s' % spec.urn)
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1```

推荐答案

这是 https://issues.apache.org/jira/browse/BEAM-12043,希望可以在下一个版本中修复.

This is https://issues.apache.org/jira/browse/BEAM-12043, hopefully a fix can get into the next release.

这篇关于带有 JdbcIO 的 Apache Beam 管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆