如何在 Apache Flink 中创建外部目录表 [英] How can I create an External Catalog Table in Apache Flink

查看:29
本文介绍了如何在 Apache Flink 中创建外部目录表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试创建 ExternalCatalog 以在 Apache Flink 表中使用.我创建并添加到 Flink 表环境(这里是官方 文档).出于某种原因,目录"中存在的唯一外部表在扫描期间未找到.我在上面的代码中遗漏了什么?

I tried to create and ExternalCatalog to use in Apache Flink Table. I created and added to the Flink table environment (here the official documentation). For some reason, the only external table present in the 'catalog', it is not found during the scan. What I missed in the code above?

  val catalogName = s"externalCatalog$fileNumber"
  val ec: ExternalCatalog = getExternalCatalog(catalogName, 1, tableEnv)
  tableEnv.registerExternalCatalog(catalogName, ec)
  val s1: Table = tableEnv.scan("S_EXT")

  def getExternalCatalog(catalogName: String, fileNumber: Int, tableEnv: BatchTableEnvironment): ExternalCatalog = {
    val cat = new InMemoryExternalCatalog(catalogName)
    // external Catalog table
    val externalCatalogTableS = getExternalCatalogTable("S")
    // add external Catalog table
    cat.createTable("S_EXT", externalCatalogTableS, ignoreIfExists = false)
    cat
  }

  private def getExternalCatalogTable(fileName: String): ExternalCatalogTable = {
    // connector descriptor
    val connectorDescriptor = new FileSystem()
    connectorDescriptor.path(getFilePath(fileNumber, fileName))
    // format
    val fd = new Csv()
    fd.field("X", Types.STRING)
    fd.field("Y", Types.STRING)
    fd.fieldDelimiter(",")
    // statistic
    val statistics = new Statistics()
    statistics.rowCount(0)
    // metadata
    val md = new Metadata()
    ExternalCatalogTable.builder(connectorDescriptor)
      .withFormat(fd)
      .withStatistics(statistics)
      .withMetadata(md)
      .asTableSource()
  }

上面的例子是这个的一部分 git 中的测试文件.

The example above is part of this test file in git.

推荐答案

这可能是命名空间问题.外部目录中的表由目录名称列表(可能是模式)和表名标识.

This is probably a namespace issue. Tables in external catalogs are identified by a list of names of the catalog, (potentially schemas,) and finally the table name.

在您的示例中,以下内容应该有效:

In your example, the following should work:

val s1: Table = tableEnv.scan("externalCatalog1", "S_EXT")

你可以看看ExternalCatalogTest 查看如何使用外部目录.

You can have a look at the ExternalCatalogTest to see how external catalogs can be used.

这篇关于如何在 Apache Flink 中创建外部目录表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆