如何在Apache Flink中创建外部目录表 [英] How can I create an External Catalog Table in Apache Flink

查看:418
本文介绍了如何在Apache Flink中创建外部目录表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图创建和ExternalCatalog在Apache Flink表中使用.我创建并添加到Flink表环境(此处为官方文档).由于某种原因,目录"中存在唯一的外部表,因此在扫描过程中找不到该表.我在上面的代码中错过了什么?

I tried to create and ExternalCatalog to use in Apache Flink Table. I created and added to the Flink table environment (here the official documentation). For some reason, the only external table present in the 'catalog', it is not found during the scan. What I missed in the code above?

  val catalogName = s"externalCatalog$fileNumber"
  val ec: ExternalCatalog = getExternalCatalog(catalogName, 1, tableEnv)
  tableEnv.registerExternalCatalog(catalogName, ec)
  val s1: Table = tableEnv.scan("S_EXT")

  def getExternalCatalog(catalogName: String, fileNumber: Int, tableEnv: BatchTableEnvironment): ExternalCatalog = {
    val cat = new InMemoryExternalCatalog(catalogName)
    // external Catalog table
    val externalCatalogTableS = getExternalCatalogTable("S")
    // add external Catalog table
    cat.createTable("S_EXT", externalCatalogTableS, ignoreIfExists = false)
    cat
  }

  private def getExternalCatalogTable(fileName: String): ExternalCatalogTable = {
    // connector descriptor
    val connectorDescriptor = new FileSystem()
    connectorDescriptor.path(getFilePath(fileNumber, fileName))
    // format
    val fd = new Csv()
    fd.field("X", Types.STRING)
    fd.field("Y", Types.STRING)
    fd.fieldDelimiter(",")
    // statistic
    val statistics = new Statistics()
    statistics.rowCount(0)
    // metadata
    val md = new Metadata()
    ExternalCatalogTable.builder(connectorDescriptor)
      .withFormat(fd)
      .withStatistics(statistics)
      .withMetadata(md)
      .asTableSource()
  }

上面的示例是此推荐答案

这可能是名称空间问题.外部目录中的表由目录名称(可能是模式)的列表标识,最后是表名称.

This is probably a namespace issue. Tables in external catalogs are identified by a list of names of the catalog, (potentially schemas,) and finally the table name.

在您的示例中,以下应该起作用:

In your example, the following should work:

val s1: Table = tableEnv.scan("externalCatalog1", "S_EXT")

您可以查看

You can have a look at the ExternalCatalogTest to see how external catalogs can be used.

这篇关于如何在Apache Flink中创建外部目录表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆