无法理解TwoPhaseCommitSinkFunction生命周期 [英] Can't understand TwoPhaseCommitSinkFunction lifecycle

查看:110
本文介绍了无法理解TwoPhaseCommitSinkFunction生命周期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要一个到Postgres DB的接收器,所以我开始构建一个自定义的Flink SinkFunction.当FlinkKafkaProducer实现TwoPhaseCommitSinkFunction时,我决定做同样的事情.如O'Reilley的书使用Apache Flink进行流处理中所述,您只需要实现抽象方法,启用检查点就可以了.但是,当我运行代码时,真正发生的是 commit 方法仅被调用一次,并且在 invoke 之前被调用,这完全是意外的,因为您不应该准备好如果您的准备提交的交易集为空,则提交.最糟糕的是,提交后,对文件中存在的所有事务行都调用 invoke ,然后调用 abort ,这更加令人意外./p>

初始化接收器后,据我所知应发生以下情况:

  1. beginTransaction被调用并发送一个标识符以调用
  2. invoke根据收到的标识符将行添加到交易中
  3. 预提交对当前交易数据进行所有最终修改
  4. commit处理预先提交的数据的最终交易

所以,我不明白为什么我的程序没有显示此行为.

这是我的接收器代码:

 软件包PostgresConnector导入java.sql.{BatchUpdateException,DriverManager,PreparedStatement,SQLException,时间戳记}导入java.text.ParseException导入java.util.{日期,属性,UUID}导入org.apache.flink.api.common.ExecutionConfig导入org.apache.flink.configuration.Configuration导入org.apache.flink.streaming.api.functions.sink.{SinkFunction,TwoPhaseCommitSinkFunction}导入org.apache.flink.streaming.api.scala._导入org.slf4j.{Logger,LoggerFactory}类PostgreSink(props:属性,config:ExecutionConfig)扩展TwoPhaseCommitSinkFunction [(String,String,String,String),String,String](createTypeInformation [String] .createSerializer(config),createTypeInformation [String] .createSerializer(config)){私人var transactionMap:Map [String,Array [(String,String,String,String)]] = Map()私人var parsedQuery:PreparedStatement = _private val insertString:字符串=将INERT插入mydb(field1,field2,point)值(?,?,point(?,?))"覆盖def invoke(事务:字符串,值:(字符串,字符串,字符串,字符串),上下文:SinkFunction.Context [_]):单位= {val LOG = LoggerFactory.getLogger(classOf [FlinkCEPClasses.FlinkCEPPipeline])val res = this.transactionMap.get(transaction)if(res.isDefined){var array = res.get数组=数组++数组(值)this.transactionMap + =(交易->数组)}别的{val数组=数组(值)this.transactionMap + =(交易->数组)}LOG.info("\ n \ n通过调用传递\ n \ n")()}覆盖def beginTransaction():字符串= {val LOG:Logger = LoggerFactory.getLogger(classOf [FlinkCEPClasses.FlinkCEPPipeline])val标识符= UUID.randomUUID.toStringLOG.info("\ n \ n通过beginTransaction \ n \ n")识别码}覆盖def preCommit(交易:字符串):单位= {val LOG = LoggerFactory.getLogger(classOf [FlinkCEPClasses.FlinkCEPPipeline])尝试{val tuple:Option [Array [(String,String,String,String)]] = this.transactionMap.get(transaction)if(tuple.isDefined){tuple.get.foreach((value:(String,String,String,String))=> {LOG.info("\ n \ n" + value.toString()+"\ n \ n")this.parsedQuery.setString(1,value._1)this.parsedQuery.setString(2,value._2)this.parsedQuery.setString(3,value._3)this.parsedQuery.setString(4,value._4)this.parsedQuery.addBatch()})}}抓住{情况e:SQLException =>LOG.info("\ n \ n将事务添加到批处理时出错:SQLException \ n \ n")情况f:ParseException =>LOG.info("\ n \ n将事务添加到批处理时出错:ParseException \ n \ n")情况g:NoSuchElementException =>LOG.info("\ n \ n将事务添加到批处理时出错:NoSuchElementException \ n \ n")情况h:异常=>LOG.info("\ n \ n将事务添加到批处理时出错:异常\ n \ n")}this.transactionMap = this.transactionMap.emptyLOG.info("\ n \ n正在通过preCommit ... \ n \ n")}覆盖def commit(事务:字符串):单位= {val LOG:Logger = LoggerFactory.getLogger(classOf [FlinkCEPClasses.FlinkCEPPipeline])if(this.parsedQuery!= null){LOG.info("\ n \ n" + this.parsedQuery.toString +"\ n \ n")}尝试{this.parsedQuery.executeBatchval LOG:Logger = LoggerFactory.getLogger(classOf [FlinkCEPClasses.FlinkCEPPipeline])LOG.info("\ n \ n执行批处理\ n \ n")}抓住{情况e:SQLException =>val LOG:Logger = LoggerFactory.getLogger(classOf [FlinkCEPClasses.FlinkCEPPipeline])LOG.info("\ n \ n" +错误:SQLException" +"\ n \ n")}this.transactionMap = this.transactionMap.emptyLOG.info("\ n \ n正在通过提交... \ n \ n")}覆盖def中止(交易:字符串):单位= {val LOG:Logger = LoggerFactory.getLogger(classOf [FlinkCEPClasses.FlinkCEPPipeline])this.transactionMap = this.transactionMap.emptyLOG.info("\ n \ n正在通过中止... \ n \ n")}覆盖def打开(参数:配置):单位= {val LOG:Logger = LoggerFactory.getLogger(classOf [FlinkCEPClasses.FlinkCEPPipeline])val driver = props.getProperty("driver")val url = props.getProperty("url")val user = props.getProperty("user")val password = props.getProperty("password")Class.forName(驱动程序)val连接= DriverManager.getConnection(URL +?user =" +用户+& password =""+密码)this.parsedQuery = connection.prepareStatement(insertionString)LOG.info("\ n \ n配置BD连接参数\ n \ n")}} 

这是我的主程序:

 软件包FlinkCEPClasses导入PostgresConnector.PostgreSink导入org.apache.flink.api.java.io.TextInputFormat导入org.apache.flink.api.java.utils.ParameterTool导入org.apache.flink.cep.PatternSelectFunction导入org.apache.flink.cep.pattern.conditions.SimpleCondition导入org.apache.flink.cep.scala.pattern.Pattern导入org.apache.flink.core.fs.{FileSystem,路径}导入org.apache.flink.streaming.api.scala._导入org.apache.flink.streaming.api.TimeCharacteristic导入org.apache.flink.cep.scala.{CEP,PatternStream}导入org.apache.flink.streaming.api.functions.source.FileProcessingMode导入org.apache.flink.streaming.api.scala.{DataStream,StreamExecutionEnvironment}导入java.util.Properties导入org.apache.flink.api.common.ExecutionConfig导入org.slf4j.{Logger,LoggerFactory}类FlinkCEPPipeline {val LOG:Logger = LoggerFactory.getLogger(classOf [FlinkCEPPipeline])LOG.info("\ n \ n正在启动管道... \ n \ n")var env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(10)env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.setParallelism(1)//var输入:DataStream [String] = env.readFile(new TextInputFormat(new Path("/home/luca/Desktop/lines")),"/home/luca/Desktop/lines",FileProcessingMode.PROCESS_CONTINUOUSLY,1)var input:DataStream [String] = env.readTextFile("/home/luca/Desktop/lines").name("Raw stream")var tupleStream:DataStream [(String,String,String,String)] = input.map(new S2PMapFunction()).name("Tuple Stream")var properties:属性=新的Properties()properties.setProperty(驱动程序","org.postgresql.Driver")properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")properties.setProperty("user","luca")properties.setProperty("password","root")tupleStream.addSink(new PostgreSink(properties,env.getConfig)).name("Postgres Sink").setParallelism(1)tupleStream.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE).name("File Sink").setParallelism(1)env.execute()} 

我的S2PMapFunction代码:

 软件包FlinkCEPClasses导入org.apache.flink.api.common.functions.MapFunction案例类S2PMapFunction()扩展了MapFunction [String,(String,String,String,String)] {覆盖def map(值:字符串):(字符串,字符串,字符串,字符串)= {var tuple = value.replaceAllLiterally((",)).replaceAllLiterally("),").split(',')(元组(0),元组(1),元组(2),元组(3))}} 

我的管道的工作方式如下:我从文件中读取行,将它们映射到字符串元组,并使用元组中的数据将其保存在Postgres DB中

如果要模拟数据,只需使用以下格式创建带有行的文件:(field1,field2,pointx,pointy)

编辑

TwoPhaseCommitSinkFUnction方法的执行顺序如下:

 正在启动管道...beginTransaction提交前beginTransaction犯罪调用调用调用调用调用调用调用调用调用调用调用调用调用调用调用调用调用调用中止 

解决方案

因此,此处为该问题的答案".需要明确的是:目前,关于 TwoPhaseCommitSinkFunction 的问题尚未解决.如果您要寻找的是原始问题,那么您应该寻找另一个答案.如果您不关心将用作接收器,那么也许我可以为您提供帮助.

按照@DavidAnderson的建议,我开始研究表API ,看看它是否可以解决我的问题,该问题是使用Flink在数据库表中插入行.

事实证明,这真的很简单.

OBS :请注意所使用的版本.我的Flink版本是 1.9.0 .

源代码

 软件包FlinkCEPClasses导入java.sql.Timestamp导入java.util.Properties导入org.apache.flink.api.common.typeinfo.{TypeInformation,类型}导入org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink导入org.apache.flink.streaming.api.TimeCharacteristic导入org.apache.flink.streaming.api.scala.{DataStream,StreamExecutionEnvironment}导入org.apache.flink.table.api.{EnvironmentSettings,表}导入org.apache.flink.table.api.scala.StreamTableEnvironment导入org.apache.flink.streaming.api.scala._导入org.apache.flink.table.sinks.TableSink导入org.postgresql.DriverTableAPIPipeline类{//-此块中的常规管道初始化-var env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(10)env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.setParallelism(1)var input:DataStream [String] = env.readTextFile("/home/luca/Desktop/lines").name(原始流")var tupleStream:DataStream [(String,Timestamp,Double,Double)] = input.map(new S2PlacaMapFunction()).name("Tuple Stream")var properties:属性=新的Properties()properties.setProperty("driver","org.postgresql.Driver")properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")properties.setProperty("user","myuser")properties.setProperty(密码","mypassword")//-此块中的常规管道初始化END ---//这两行创建Flink所谓的StreamTableEnvironment.//这似乎与普通的流初始化非常相似.val设置= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnv = StreamTableEnvironment.create(env,settings)//由于我想将数据下沉到数据库中,因此我使用了JDBC TableSink,//因为它非常直观,并且完全符合我的需求.您可以//寻找其他适合您解决方案的TableSink类.var tableSink:JDBCAppendTableSink = JDBCAppendTableSink.builder().setBatchSize(1).setDBUrl("jdbc:postgresql://localhost:5432/mydb").setDrivername("org.postgresql.Driver").setPassword("mypassword").setUsername("myuser").setQuery("INSERT INTO mytable(data1,data2,data3)VALUES(?,?,point(?,?))").setParameterTypes(Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE,Types.DOUBLE).建造()val fieldNames = Array("data1","data2","data3","data4")val fieldTypes = Array [TypeInformation [_]](Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE,Types.DOUBLE)//这是代码的关键部分:首先,您需要注册//您的表接收器,告知名称,字段名称,字段类型和//TableSink对象.tableEnv.registerTableSink("postgres-table-sink",fieldNames,fieldTypes,tableSink)//然后,将DataStream转换为Table对象.var table = tableEnv.fromDataStream(tupleStream)//最后,将流数据插入已注册的接收器.table.insertInto("postgres-table-sink")env.execute()} 

I needed a sink to Postgres DB, so I started to build a custom Flink SinkFunction. As FlinkKafkaProducer implements TwoPhaseCommitSinkFunction, then I decided to do the same. As stated in O'Reilley's book Stream Processing with Apache Flink, you just need to implement the abstract methods, enable checkpointing and you're up to go. But what really happens when I run my code is that commit method is called only once, and it is called before invoke, what is totally unexpected since you shouldn't be ready to commit if your set of ready-to-commit transactions is empty. And the worst is that, after committing, invoke is called for all of the transaction lines present in my file, and then abort is called, which is even more unexpected.

When the Sink is initialized, It is of my understanding that the following should occur:

  1. beginTransaction is called and sends an identifier to invoke
  2. invoke adds the lines to the transaction, according to the identifier received
  3. pre-commit makes all final modification on current transaction data
  4. commit handles the finalized transaction of pre-commited data

So, I can't see why my program doesn't show this behaviour.

Here goes my sink code:

package PostgresConnector

import java.sql.{BatchUpdateException, DriverManager, PreparedStatement, SQLException, Timestamp}
import java.text.ParseException
import java.util.{Date, Properties, UUID}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{SinkFunction, TwoPhaseCommitSinkFunction}
import org.apache.flink.streaming.api.scala._
import org.slf4j.{Logger, LoggerFactory}




class PostgreSink(props : Properties, config : ExecutionConfig) extends TwoPhaseCommitSinkFunction[(String,String,String,String),String,String](createTypeInformation[String].createSerializer(config),createTypeInformation[String].createSerializer(config)){
    
    private var transactionMap : Map[String,Array[(String,String,String,String)]] = Map()
    
    private var parsedQuery : PreparedStatement = _
    
    private val insertionString : String = "INSERT INTO mydb (field1,field2,point) values (?,?,point(?,?))"
    
    override def invoke(transaction: String, value: (String,String,String,String), context: SinkFunction.Context[_]): Unit = {
    
        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        val res = this.transactionMap.get(transaction)
        
        if(res.isDefined){
    
            var array = res.get
            
            array = array ++ Array(value)
    
            this.transactionMap += (transaction -> array)
            
        }else{
    
            val array = Array(value)
    
            this.transactionMap += (transaction -> array)
            
            
        }
    
        LOG.info("\n\nPassing through invoke\n\n")
        
        ()
        
    }
    
    override def beginTransaction(): String = {
    
        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        val identifier = UUID.randomUUID.toString
    
        LOG.info("\n\nPassing through beginTransaction\n\n")
        
        identifier
        
        
    }
    
    override def preCommit(transaction: String): Unit = {
        
        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
    
        try{
        
            val tuple : Option[Array[(String,String,String,String)]]= this.transactionMap.get(transaction)
        
            if(tuple.isDefined){
            
                tuple.get.foreach( (value : (String,String,String,String)) => {
                
                    LOG.info("\n\n"+value.toString()+"\n\n")
                
                    this.parsedQuery.setString(1,value._1)
                    this.parsedQuery.setString(2,value._2)
                    this.parsedQuery.setString(3,value._3)
                    this.parsedQuery.setString(4,value._4)
                    this.parsedQuery.addBatch()
                
                })
                
            }
        
        }catch{
        
            case e : SQLException =>
                LOG.info("\n\nError when adding transaction to batch: SQLException\n\n")
        
            case f : ParseException =>
                LOG.info("\n\nError when adding transaction to batch: ParseException\n\n")
        
            case g : NoSuchElementException =>
                LOG.info("\n\nError when adding transaction to batch: NoSuchElementException\n\n")
        
            case h : Exception =>
                LOG.info("\n\nError when adding transaction to batch: Exception\n\n")
        
        }
        
        this.transactionMap = this.transactionMap.empty
    
        LOG.info("\n\nPassing through preCommit...\n\n")
    }
    
    override def commit(transaction: String): Unit = {
    
        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        if(this.parsedQuery != null) {
            LOG.info("\n\n" + this.parsedQuery.toString+ "\n\n")
        }
        
        try{
            
            this.parsedQuery.executeBatch
            val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
            LOG.info("\n\nExecuting batch\n\n")
            
        }catch{
    
            case e : SQLException =>
                val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
                LOG.info("\n\n"+"Error : SQLException"+"\n\n")
            
        }
        
        this.transactionMap = this.transactionMap.empty
    
        LOG.info("\n\nPassing through commit...\n\n")
        
    }
    
    override def abort(transaction: String): Unit = {
    
        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        this.transactionMap = this.transactionMap.empty
    
        LOG.info("\n\nPassing through abort...\n\n")
        
    }
    
    override def open(parameters: Configuration): Unit = {
    
        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        val driver = props.getProperty("driver")
        val url = props.getProperty("url")
        val user = props.getProperty("user")
        val password = props.getProperty("password")
        Class.forName(driver)
        val connection = DriverManager.getConnection(url + "?user=" + user + "&password=" + password)
        this.parsedQuery = connection.prepareStatement(insertionString)
    
        LOG.info("\n\nConfiguring BD conection parameters\n\n")
    }
}

And this is my main program:

package FlinkCEPClasses

import PostgresConnector.PostgreSink
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties

import org.apache.flink.api.common.ExecutionConfig
import org.slf4j.{Logger, LoggerFactory}

class FlinkCEPPipeline {

  val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPPipeline])
  LOG.info("\n\nStarting the pipeline...\n\n")
  
  var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.enableCheckpointing(10)
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  env.setParallelism(1)

  //var input : DataStream[String] = env.readFile(new TextInputFormat(new Path("/home/luca/Desktop/lines")),"/home/luca/Desktop/lines",FileProcessingMode.PROCESS_CONTINUOUSLY,1)

  var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Raw stream")
  
  var tupleStream : DataStream[(String,String,String,String)] = input.map(new S2PMapFunction()).name("Tuple Stream")
  
  var properties : Properties = new Properties()
  
  properties.setProperty("driver","org.postgresql.Driver")
  properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
  properties.setProperty("user","luca")
  properties.setProperty("password","root")
  
  tupleStream.addSink(new PostgreSink(properties,env.getConfig)).name("Postgres Sink").setParallelism(1)
  tupleStream.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE).name("File Sink").setParallelism(1)

  env.execute()


}

My S2PMapFunction code:

package FlinkCEPClasses

import org.apache.flink.api.common.functions.MapFunction

case class S2PMapFunction() extends MapFunction[String,(String,String,String,String)] {
    
    override def map(value: String): (String, String, String,String) = {
    
    
            var tuple = value.replaceAllLiterally("(","").replaceAllLiterally(")","").split(',')
    
            (tuple(0),tuple(1),tuple(2),tuple(3))
        
    }
}

My pipeline works like this: I read lines from a file, map them to a tuple of strings, and use the data inside the tuples to save them in a Postgres DB

If you want to simulate the data, just create a file with lines in a format like this: (field1,field2,pointx,pointy)

Edit

The execution order of the TwoPhaseCommitSinkFUnction's methods is the following:

Starting pipeline...
beginTransaction
preCommit
beginTransaction
commit
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
abort

解决方案

So, here goes the "answer" for this question. Just to be clear: at this moment, the problem about the TwoPhaseCommitSinkFunction hasn't been solved yet. If what you're looking for is about the original problem, then you should look for another answer. If you don't care about what you'll use as a sink, then maybe I can help you with that.

As suggested by @DavidAnderson, I started to study the Table API and see if it could solve my problem, which was using Flink to insert lines in my database table.

It turned out to be really simple, as you'll see.

OBS: Beware of the version you are using. My Flink's version is 1.9.0.

Source code

package FlinkCEPClasses

import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sinks.TableSink
import org.postgresql.Driver

class TableAPIPipeline {

    // --- normal pipeline initialization in this block ---

    var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(10)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)


    var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Original stream")

    var tupleStream : DataStream[(String,Timestamp,Double,Double)] = input.map(new S2PlacaMapFunction()).name("Tuple Stream")

    var properties : Properties = new Properties()

    properties.setProperty("driver","org.postgresql.Driver")
    properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
    properties.setProperty("user","myuser")
    properties.setProperty("password","mypassword")

    // --- normal pipeline initialization in this block END ---

    // These two lines create what Flink calls StreamTableEnvironment. 
    // It seems pretty similar to a normal stream initialization.
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env,settings)

    //Since I wanted to sink data into a database, I used JDBC TableSink,
    //because it is very intuitive and is a exact match with my need. You may
    //look for other TableSink classes that fit better in you solution.
    var tableSink : JDBCAppendTableSink = JDBCAppendTableSink.builder()
    .setBatchSize(1)
    .setDBUrl("jdbc:postgresql://localhost:5432/mydb")
    .setDrivername("org.postgresql.Driver")
    .setPassword("mypassword")
    .setUsername("myuser")
    .setQuery("INSERT INTO mytable (data1,data2,data3) VALUES (?,?,point(?,?))")
    .setParameterTypes(Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE,Types.DOUBLE)
    .build()

    val fieldNames = Array("data1","data2","data3","data4")
    val fieldTypes = Array[TypeInformation[_]](Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE, Types.DOUBLE)



    // This is the crucial part of the code: first, you need to register
    // your table sink, informing the name, the field names, field types and
    // the TableSink object.

    tableEnv.registerTableSink("postgres-table-sink",
        fieldNames,
        fieldTypes,
        tableSink
    )

    // Then, you transform your DataStream into a Table object.
    var table = tableEnv.fromDataStream(tupleStream)

    // Finally, you insert your stream data into the registered sink.
    table.insertInto("postgres-table-sink")




    env.execute()

}

这篇关于无法理解TwoPhaseCommitSinkFunction生命周期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆