在 Nifi 中从 Avro Schema 创建 Postgresql 表 [英] Create a Postgresql table from Avro Schema in Nifi

查看:55
本文介绍了在 Nifi 中从 Avro Schema 创建 Postgresql 表的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 InferAvroSchema 我得到了我的文件的 Avro Schema.我想使用这个 Avro 模式在 PostregSql 中创建一个表.我必须使用哪个处理器.

Using InferAvroSchema I got an Avro Schema of my file. I want to create a table in PostregSql using this Avro schema. Which processor I have to use.

我使用:GetFile->InferAvroSchema->我想从这个模式创建一个表->放置databaseRecord.

I use : GetFile->InferAvroSchema-> I want to create a table from this schema -> Put databaseRecord.

avro 架构:

{
  "type" : "record",
  "name" : "warranty",
  "doc" : "Schema generated by Kite",
  "fields" : [ {
    "name" : "id",
    "type" : "long",
    "doc" : "Type inferred from '1'"
  }, {
    "name" : "train_id",
    "type" : "long",
    "doc" : "Type inferred from '21691'"
  }, {
    "name" : "siemens_nr",
    "type" : "string",
    "doc" : "Type inferred from 'Loco-001'"
  }, {
    "name" : "uic_nr",
    "type" : "long",
    "doc" : "Type inferred from '193901'"
  }, {
    "name" : "Configuration",
    "type" : "string",
    "doc" : "Type inferred from 'ZP28'"
  }, {
    "name" : "Warranty_Status",
    "type" : "string",
    "doc" : "Type inferred from 'Out_of_Warranty'"
  }, {
    "name" : "Warranty_Data_Type",
    "type" : "string",
    "doc" : "Type inferred from 'Real_based_on_preliminary_acceptance_date'"
  }, {
    "name" : "of_progression",
    "type" : "long",
    "doc" : "Type inferred from '100'"
  }, {
    "name" : "Delivery_Date",
    "type" : "string",
    "doc" : "Type inferred from '18/12/2009'"
  }, {
    "name" : "Warranty_on_Delivery_Date",
    "type" : "string",
    "doc" : "Type inferred from '18/12/2013'"
  }, {
    "name" : "Customer_Status",
    "type" : "string",
    "doc" : "Type inferred from 'homologation'"
  }, {
    "name" : "Commissioning_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/10/2010'"
  }, {
    "name" : "Preliminary_acceptance_date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2011'"
  }, {
    "name" : "Warranty_Start_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2011'"
  }, {
    "name" : "Warranty_End_Date",
    "type" : "string",
    "doc" : "Type inferred from '6/01/2013'"
  }, {
    "name" : "Effective_End_Warranty_Date",
    "type" : [ "null", "string" ],
    "doc" : "Type inferred from 'null'",
    "default" : null
  }, {
    "name" : "Level_2_in_function",
    "type" : "string",
    "doc" : "Type inferred from '17/07/2015'"
  }, {
    "name" : "Baseline",
    "type" : "string",
    "doc" : "Type inferred from '2.10.23.4'"
  }, {
    "name" : "RELN_revision",
    "type" : "string",
    "doc" : "Type inferred from '0434-26.3'"
  }, {
    "name" : "TC_report",
    "type" : "string",
    "doc" : "Type inferred from 'A480140'"
  }, {
    "name" : "Last_version_Date",
    "type" : "string",
    "doc" : "Type inferred from 'A-23/09/2015'"
  }, {
    "name" : "ETCS_ID_NID_Engine",
    "type" : [ "null", "long" ],
    "doc" : "Type inferred from '13001'",
    "default" : null
  }, {
    "name" : "Item_Type",
    "type" : "string",
    "doc" : "Type inferred from 'Item'"
  }, {
    "name" : "Path",
    "type" : "string",
    "doc" : "Type inferred from 'sites/TrWMTISnerc_Community/Lists/X4Trains'"
  } ]
}

我的表创建表是:

Create table warranty(
  id    float,
  train_id float,
  siemens_nr    varchar(255),
  uic_nr    float,
  configuration varchar(255),
  warranty_status   varchar(255),
  warranty_data_type    varchar(255),
  of_progression    float,
  delivery_date varchar(255),
  warranty_on_delivery_date varchar(255),
  customer_status   varchar(255),
  commissioning_date    varchar(255),
  preliminary_acceptance_date   varchar(255),
  warranty_start_date   varchar(255),
  warranty_end_date varchar(255),
  effective_end_warranty_date   varchar(255),
  level_2_in_function   varchar(255),
  baseline  varchar(255),
  reln_revision varchar(255),
  tc_report varchar(255),
  last_version_Date varchar(255),
  etcs_id_nid_engine    float,
  item_type  varchar(255),
  path varchar(255)

)

推荐答案

我可以在nifi v1.5+中推荐ExecuteGroovyScript处理器

I can suggest ExecuteGroovyScript processor in nifi v1.5+

定义新属性 SQL.mydb - 系统将提示您将其值链接到数据库 (DBCPConnectionPool)

define new property SQL.mydb - you will be prompted to link its value to a database (DBCPConnectionPool)

选择要建表的数据库

并使用此脚本(假设 avro 架构在流文件内容中)

and use this script (assume avro schema is in the flow file content)

import groovy.json.JsonSlurper

def ff = session.get()
if(!ff)return

//parse avro schema from flow file content
def schema = ff.read().withReader("UTF-8"){ new JsonSlurper().parse(it) }

//define type mapping
def typeMap = [
    "string"            : "varchar(255)",
    "long"              : "numeric(10)",
    [ "null", "string" ]: "varchar(255)",
    [ "null", "long" ]  : "numeric(10)",
]

assert schema.name && schema.name=~/^w.*/

//build create table statement
def createTable = "create table ${schema.name} (" +
    schema.fields.collect{ "
  ${it.name.padRight(39)} ${typeMap[it.type]}" }.join(',') +
    "
)"

//execute statement through the custom defined property
//SQL.mydb references http://docs.groovy-lang.org/2.4.10/html/api/groovy/sql/Sql.html object
SQL.mydb.execute(createTable as String) //important to cast to String

//transfer flow file to success
REL_SUCCESS << ff

这篇关于在 Nifi 中从 Avro Schema 创建 Postgresql 表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆