无法理解 TwoPhaseCommitSinkFunction 生命周期 [英] Can't understand TwoPhaseCommitSinkFunction lifecycle

查看:36
本文介绍了无法理解 TwoPhaseCommitSinkFunction 生命周期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要一个 Postgres DB 的接收器,所以我开始构建一个自定义的 Flink SinkFunction.由于 FlinkKafkaProducer 实现了 TwoPhaseCommitSinkFunction,所以我决定做同样的事情.正如 O'Reilley 的 Stream Processing with Apache Flink 一书中所述,您只需要实现抽象方法,启用检查点,就可以了.但是当我运行我的代码时真正发生的是 commit 方法只被调用一次,它在 invoke 之前被调用,这是完全出乎意料的,因为你不应该准备好如果您的准备提交事务集为空,则提交.最糟糕的是,在提交后,对我文件中存在的所有事务行调用了 invoke,然后调用了 abort,这更加出乎意料.

I needed a sink to Postgres DB, so I started to build a custom Flink SinkFunction. As FlinkKafkaProducer implements TwoPhaseCommitSinkFunction, then I decided to do the same. As stated in O'Reilley's book Stream Processing with Apache Flink, you just need to implement the abstract methods, enable checkpointing and you're up to go. But what really happens when I run my code is that commit method is called only once, and it is called before invoke, what is totally unexpected since you shouldn't be ready to commit if your set of ready-to-commit transactions is empty. And the worst is that, after committing, invoke is called for all of the transaction lines present in my file, and then abort is called, which is even more unexpected.

当 Sink 被初始化时,我的理解是应该发生以下情况:

When the Sink is initialized, It is of my understanding that the following should occur:

  1. beginTransaction 被调用并发送一个标识符来调用
  2. invoke 根据收到的标识符将行添加到交易中
  3. pre-commit 对当前交易数据进行所有最终修改
  4. commit 处理预提交数据的最终交易

所以,我不明白为什么我的程序没有显示这种行为.

So, I can't see why my program doesn't show this behaviour.

这是我的接收器代码:

package PostgresConnector

import java.sql.{BatchUpdateException, DriverManager, PreparedStatement, SQLException, Timestamp}
import java.text.ParseException
import java.util.{Date, Properties, UUID}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{SinkFunction, TwoPhaseCommitSinkFunction}
import org.apache.flink.streaming.api.scala._
import org.slf4j.{Logger, LoggerFactory}




class PostgreSink(props : Properties, config : ExecutionConfig) extends TwoPhaseCommitSinkFunction[(String,String,String,String),String,String](createTypeInformation[String].createSerializer(config),createTypeInformation[String].createSerializer(config)){
    
    private var transactionMap : Map[String,Array[(String,String,String,String)]] = Map()
    
    private var parsedQuery : PreparedStatement = _
    
    private val insertionString : String = "INSERT INTO mydb (field1,field2,point) values (?,?,point(?,?))"
    
    override def invoke(transaction: String, value: (String,String,String,String), context: SinkFunction.Context[_]): Unit = {
    
        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        val res = this.transactionMap.get(transaction)
        
        if(res.isDefined){
    
            var array = res.get
            
            array = array ++ Array(value)
    
            this.transactionMap += (transaction -> array)
            
        }else{
    
            val array = Array(value)
    
            this.transactionMap += (transaction -> array)
            
            
        }
    
        LOG.info("\n\nPassing through invoke\n\n")
        
        ()
        
    }
    
    override def beginTransaction(): String = {
    
        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        val identifier = UUID.randomUUID.toString
    
        LOG.info("\n\nPassing through beginTransaction\n\n")
        
        identifier
        
        
    }
    
    override def preCommit(transaction: String): Unit = {
        
        val LOG = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
    
        try{
        
            val tuple : Option[Array[(String,String,String,String)]]= this.transactionMap.get(transaction)
        
            if(tuple.isDefined){
            
                tuple.get.foreach( (value : (String,String,String,String)) => {
                
                    LOG.info("\n\n"+value.toString()+"\n\n")
                
                    this.parsedQuery.setString(1,value._1)
                    this.parsedQuery.setString(2,value._2)
                    this.parsedQuery.setString(3,value._3)
                    this.parsedQuery.setString(4,value._4)
                    this.parsedQuery.addBatch()
                
                })
                
            }
        
        }catch{
        
            case e : SQLException =>
                LOG.info("\n\nError when adding transaction to batch: SQLException\n\n")
        
            case f : ParseException =>
                LOG.info("\n\nError when adding transaction to batch: ParseException\n\n")
        
            case g : NoSuchElementException =>
                LOG.info("\n\nError when adding transaction to batch: NoSuchElementException\n\n")
        
            case h : Exception =>
                LOG.info("\n\nError when adding transaction to batch: Exception\n\n")
        
        }
        
        this.transactionMap = this.transactionMap.empty
    
        LOG.info("\n\nPassing through preCommit...\n\n")
    }
    
    override def commit(transaction: String): Unit = {
    
        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        if(this.parsedQuery != null) {
            LOG.info("\n\n" + this.parsedQuery.toString+ "\n\n")
        }
        
        try{
            
            this.parsedQuery.executeBatch
            val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
            LOG.info("\n\nExecuting batch\n\n")
            
        }catch{
    
            case e : SQLException =>
                val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
                LOG.info("\n\n"+"Error : SQLException"+"\n\n")
            
        }
        
        this.transactionMap = this.transactionMap.empty
    
        LOG.info("\n\nPassing through commit...\n\n")
        
    }
    
    override def abort(transaction: String): Unit = {
    
        val LOG : Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        this.transactionMap = this.transactionMap.empty
    
        LOG.info("\n\nPassing through abort...\n\n")
        
    }
    
    override def open(parameters: Configuration): Unit = {
    
        val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPClasses.FlinkCEPPipeline])
        
        val driver = props.getProperty("driver")
        val url = props.getProperty("url")
        val user = props.getProperty("user")
        val password = props.getProperty("password")
        Class.forName(driver)
        val connection = DriverManager.getConnection(url + "?user=" + user + "&password=" + password)
        this.parsedQuery = connection.prepareStatement(insertionString)
    
        LOG.info("\n\nConfiguring BD conection parameters\n\n")
    }
}

这是我的主程序:

package FlinkCEPClasses

import PostgresConnector.PostgreSink
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.pattern.conditions.SimpleCondition
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import java.util.Properties

import org.apache.flink.api.common.ExecutionConfig
import org.slf4j.{Logger, LoggerFactory}

class FlinkCEPPipeline {

  val LOG: Logger = LoggerFactory.getLogger(classOf[FlinkCEPPipeline])
  LOG.info("\n\nStarting the pipeline...\n\n")
  
  var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.enableCheckpointing(10)
  env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
  env.setParallelism(1)

  //var input : DataStream[String] = env.readFile(new TextInputFormat(new Path("/home/luca/Desktop/lines")),"/home/luca/Desktop/lines",FileProcessingMode.PROCESS_CONTINUOUSLY,1)

  var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Raw stream")
  
  var tupleStream : DataStream[(String,String,String,String)] = input.map(new S2PMapFunction()).name("Tuple Stream")
  
  var properties : Properties = new Properties()
  
  properties.setProperty("driver","org.postgresql.Driver")
  properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
  properties.setProperty("user","luca")
  properties.setProperty("password","root")
  
  tupleStream.addSink(new PostgreSink(properties,env.getConfig)).name("Postgres Sink").setParallelism(1)
  tupleStream.writeAsText("/home/luca/Desktop/output",FileSystem.WriteMode.OVERWRITE).name("File Sink").setParallelism(1)

  env.execute()


}

我的 S2PMapFunction 代码:

My S2PMapFunction code:

package FlinkCEPClasses

import org.apache.flink.api.common.functions.MapFunction

case class S2PMapFunction() extends MapFunction[String,(String,String,String,String)] {
    
    override def map(value: String): (String, String, String,String) = {
    
    
            var tuple = value.replaceAllLiterally("(","").replaceAllLiterally(")","").split(',')
    
            (tuple(0),tuple(1),tuple(2),tuple(3))
        
    }
}

我的管道是这样工作的:我从文件中读取行,将它们映射到字符串元组,然后使用元组中的数据将它们保存在 Postgres 数据库中

My pipeline works like this: I read lines from a file, map them to a tuple of strings, and use the data inside the tuples to save them in a Postgres DB

如果您想模拟数据,只需创建一个包含以下格式行的文件:(field1,field2,pointx,pointy)

If you want to simulate the data, just create a file with lines in a format like this: (field1,field2,pointx,pointy)

TwoPhaseCommitSinkFUnction 的方法的执行顺序如下:

The execution order of the TwoPhaseCommitSinkFUnction's methods is the following:

Starting pipeline...
beginTransaction
preCommit
beginTransaction
commit
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
invoke
abort

推荐答案

所以,这里有这个问题的答案".需要说明的是:目前,TwoPhaseCommitSinkFunction 的问题还没有解决.如果您要寻找的是原始问题,那么您应该寻找另一个答案.如果你不关心你将使用什么作为水槽,那么也许我可以帮助你.

So, here goes the "answer" for this question. Just to be clear: at this moment, the problem about the TwoPhaseCommitSinkFunction hasn't been solved yet. If what you're looking for is about the original problem, then you should look for another answer. If you don't care about what you'll use as a sink, then maybe I can help you with that.

按照@DavidAnderson 的建议,我开始研究 Table API 看看它是否能解决我的问题,即使用 Flink 在我的数据库表中插入行.

As suggested by @DavidAnderson, I started to study the Table API and see if it could solve my problem, which was using Flink to insert lines in my database table.

事实证明它非常简单,正如您所见.

It turned out to be really simple, as you'll see.

OBS:注意您使用的版本.我的 Flink 版本是 1.9.0.

OBS: Beware of the version you are using. My Flink's version is 1.9.0.

源代码

package FlinkCEPClasses

import java.sql.Timestamp
import java.util.Properties

import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.api.java.io.jdbc.JDBCAppendTableSink
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{EnvironmentSettings, Table}
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.sinks.TableSink
import org.postgresql.Driver

class TableAPIPipeline {

    // --- normal pipeline initialization in this block ---

    var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(10)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    env.setParallelism(1)


    var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines").name("Original stream")

    var tupleStream : DataStream[(String,Timestamp,Double,Double)] = input.map(new S2PlacaMapFunction()).name("Tuple Stream")

    var properties : Properties = new Properties()

    properties.setProperty("driver","org.postgresql.Driver")
    properties.setProperty("url","jdbc:postgresql://localhost:5432/mydb")
    properties.setProperty("user","myuser")
    properties.setProperty("password","mypassword")

    // --- normal pipeline initialization in this block END ---

    // These two lines create what Flink calls StreamTableEnvironment. 
    // It seems pretty similar to a normal stream initialization.
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv = StreamTableEnvironment.create(env,settings)

    //Since I wanted to sink data into a database, I used JDBC TableSink,
    //because it is very intuitive and is a exact match with my need. You may
    //look for other TableSink classes that fit better in you solution.
    var tableSink : JDBCAppendTableSink = JDBCAppendTableSink.builder()
    .setBatchSize(1)
    .setDBUrl("jdbc:postgresql://localhost:5432/mydb")
    .setDrivername("org.postgresql.Driver")
    .setPassword("mypassword")
    .setUsername("myuser")
    .setQuery("INSERT INTO mytable (data1,data2,data3) VALUES (?,?,point(?,?))")
    .setParameterTypes(Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE,Types.DOUBLE)
    .build()

    val fieldNames = Array("data1","data2","data3","data4")
    val fieldTypes = Array[TypeInformation[_]](Types.STRING,Types.SQL_TIMESTAMP,Types.DOUBLE, Types.DOUBLE)



    // This is the crucial part of the code: first, you need to register
    // your table sink, informing the name, the field names, field types and
    // the TableSink object.

    tableEnv.registerTableSink("postgres-table-sink",
        fieldNames,
        fieldTypes,
        tableSink
    )

    // Then, you transform your DataStream into a Table object.
    var table = tableEnv.fromDataStream(tupleStream)

    // Finally, you insert your stream data into the registered sink.
    table.insertInto("postgres-table-sink")




    env.execute()

}

这篇关于无法理解 TwoPhaseCommitSinkFunction 生命周期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆