pyflink tableAPI,多个源到单个处理表序列 [英] pyflink tableAPI, multiple sources to single processing table sequence

查看:15
本文介绍了pyflink tableAPI,多个源到单个处理表序列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现一个pyflink作业(通过Table API),在来自多个源的数据转换为标准格式后,该作业从多个源执行一些基本处理。我可以将每个源中的数据转换为所需的格式(具有指定列的"表架构"),但不能将该数据设置为"处理表"以供进一步实现。这就是我要做的:

若要将不同的源架构转换为标准的处理表方案,然后可以对其进行窗口化,请实现表函数(针对多个输入行)等。

编辑:

感谢您建议避免使用可能的保留名称。更改了这一点,并在UDF实现之后包括别名(将行结果拆分为列值/名称).取得了一些进展,我现在看到的例外是在"中央合并表"定义中(图中标准表):

py4j.protocol.Py4JJavaError: An error occurred while calling o50.execute.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.process'.

Table options are:

'connector'='print'

...

Caused by: org.apache.flink.table.api.ValidationException: Connector 'print' can only be used as a sink. It cannot be used as a source.
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:598)
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:559)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:144)
如果表没有引用像Kafka这样的源、文件或输入数据(我猜是临时视图表或其他什么),谁能指出正确的表定义应该是什么吗?表定义/注册外观)

更新代码:

import os
import sys
from pandas.core.frame import DataFrame
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings, DataTypes, StreamTableEnvironment
import pandas as pd
from pyflink.table.udf import ScalarFunction, TableFunction, udf, udtf
import json
import numpy as np
from pyflink.common import Row

# ====== source UDFs:
class sourceUdf(ScalarFunction):
    def eval(self, string: str):
        from dateutil import parser
        import json
        input = json.loads(string)
        time = parser.parse(input['timestamp'])
        return Row(input['key'], time, input) 

class source_two_Udf(ScalarFunction):
    def eval(self, string: str):
        # other custom logic to produce row schema of format: [key, timestamp, content]
        return Row(....)

class sinkUdf(ScalarFunction):
    def eval(self, input: Row):
        recordkey, tm, content = input

        import json
        out = {
            'recordKey': recordkey,
            'tm': str(tm),
            'content': content
        }
        return json.dumps(out)

# ======

  env = StreamExecutionEnvironment.get_execution_environment()
  env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
  settings = EnvironmentSettings.new_instance()
                      .in_streaming_mode()
                      .use_blink_planner()
                      .build()

  source_ddl = f"""
            CREATE TABLE source_table(
                entry STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SOURCE_TOPIC}',
              'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
              'properties.group.id' = '{KAFKA_GROUP_ID}',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{KAFKA_USERNAME}" password="{KAFKA_PASSWORD}";',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'raw'
            )
            """

  source_ddl2 = f"""
            CREATE TABLE second_source_table(
                entry STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SOURCE2_TOPIC}',
              'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
              'properties.group.id' = '{KAFKA_GROUP_ID}',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{KAFKA_USERNAME}" password="{KAFKA_PASSWORD}";',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'raw'
            )
            """

  sink_ddl = f"""
            CREATE TABLE sink_table(
                entry STRING
            ) WITH (
              'connector' = 'print'
            )
            """

  t_env.execute_sql(source_ddl).wait()
  t_env.execute_sql(source_ddl2).wait()
  t_env.execute_sql(sink_ddl).wait()

  # intermediate table which should receive all the converted source data
  t_env.execute_sql(f"""
        CREATE TABLE process (
            recordKey STRING, 
            tm TIMESTAMP(3), 
            content STRING,
            WATERMARK FOR tm AS tm - INTERVAL '5' SECONDS
        ) WITH ( 
            'connector' = 'print'
        )
    """).wait()

  source_udf = udf(sourceUdf(), result_type=DataTypes.ROW([DataTypes.FIELD('recordKey', DataTypes.STRING()),DataTypes.FIELD('tm',  DataTypes.TIMESTAMP(3)), DataTypes.FIELD('content', DataTypes.STRING()) ]))
  t_env.register_function("sourceUdf", source_udf)

  two_udf = udf(source_two_Udf(), result_type=DataTypes.ROW([DataTypes.FIELD('recordKey', DataTypes.STRING()),DataTypes.FIELD('tm',  DataTypes.TIMESTAMP(3)), DataTypes.FIELD('content', DataTypes.STRING()) ]))
  t_env.register_function("sourceUdf", source_udf)

  sink_udf = udf(sinkUdf(),
    result_type=DataTypes.STRING())
  t_env.register_function("sinkUdf", sink_udf)

  tbl = t_env.from_path(' source_table') 
  tbl = tbl.map(source_udf).alias('recordKey', 'tm', 'content')
  tbl = tbl.insert_into('process')

  tblsrc = t_env.from_path('second_source_table')
  tblsrc = tblsrc.map(source_udf).alias('recordKey', 'tm', 'content')
  tblsrc = tblsrc.insert_into('process')

  merge = t_env.from_path('process') 
  merge = merge.map(sink_udf).alias('entry')
  merge = merge.insert_into('sink_table')

  t_env.execute("test")

推荐答案

您的管道的问题在于您将表process用作此处的源表:

merge = t_env.from_path('process')

因为process使用connector = 'print',所以您不能将其用作源,因为print连接器只能用作接收器(插入到)。

查看您的管道,您似乎使用process表合并来自第一个和第二个管道的内容,然后将此结果推送到接收器中。我建议您选择以下两个选项之一:

  • 您使用的连接器可以同时用作接收器和源,例如filesystem connector
  • 拆分管道,中间表可以是视图

这篇关于pyflink tableAPI,多个源到单个处理表序列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆