如何在Spark结构化流中将两个流df写入MySQL的两个不同表中? [英] How to write two streaming df's into two different tables in MySQL in Spark sturctured streaming?

查看:115
本文介绍了如何在Spark结构化流中将两个流df写入MySQL的两个不同表中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用spark 2.3.2版本.

I am using spark 2.3.2 Version.

我已经在Spark结构化流中编写了代码,以将流数据帧数据插入到两个不同的MySQL表中.

I have written code in spark structured streaming to insert streaming dataframes data into two different MySQL tables.

假设有两个流式df:DF1,DF2.

Let say there are two streaming df's: DF1, DF2.

我已经使用foreachWriter API编写了两个查询(query1,query2),分别从不同的流中写入MySQL表.IE.DF1进入MYSQLtable A,DF2进入MYSQL表B.

I have written two queries(query1,query2) using foreachWriter API to write into MySQL tables from different streamings respectively. I.E. DF1 into MYSQLtable A and DF2 into MYSQL table B.

当我运行spark作业时,它首先运行query1,然后运行query2,因此它将写入表A,但不写入表B.

When I run the spark job, first it runs query1 and then query2, so it's writing to table A but not into table B.

如果我更改代码以先运行query2然后再运行query1,则将其写入表B,但不写入表A.

If I change my code to run query2 first and then query1, its writing into table B but not into table A.

因此,我了解到它只是在向表中写入时才执行第一个查询.

So I understand that it's executing the first coming query only to write into the table.

注意:我尝试分别给两个表提供不同的MySQL用户/数据库.但没有运气.

Note: I have tried giving different MySQL user/database to two tables respectively. But no luck.

有人可以提出建议吗?如何使其工作.

Can anyone please advise? How to make it work.

我的代码如下:

import java.sql._

class  JDBCSink1(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
      val driver = "com.mysql.jdbc.Driver"
      var connection:Connection = _
      var statement:Statement = _
      
    def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
      }

      def process(value: (org.apache.spark.sql.Row)): Unit = {

        val insertSql = """ INSERT INTO tableA(col1,col2,col3) VALUES(?,?,?); """
        val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
        preparedStmt.setString (1, value(0).toString)
        preparedStmt.setString (2, value(1).toString)
        preparedStmt.setString (3, value(2).toString)
        preparedStmt.execute
      }

      def close(errorOrNull: Throwable): Unit = {
        connection.close
      }
   }



class  JDBCSink2(url:String, user:String, pwd:String) extends ForeachWriter[org.apache.spark.sql.Row] {
      val driver = "com.mysql.jdbc.Driver"
      var connection:Connection = _
      var statement:Statement = _
      
    def open(partitionId: Long,version: Long): Boolean = {
        Class.forName(driver)
        connection = DriverManager.getConnection(url, user, pwd)
        statement = connection.createStatement
        true
      }

      def process(value: (org.apache.spark.sql.Row)): Unit = {

        val insertSql = """ INSERT INTO tableB(col1,col2) VALUES(?,?); """
        val preparedStmt: PreparedStatement = connection.prepareStatement(insertSql)
        preparedStmt.setString (1, value(0).toString)
        preparedStmt.setString (2, value(1).toString)
        preparedStmt.execute
      }

      def close(errorOrNull: Throwable): Unit = {
        connection.close
      }
   }



val url1="jdbc:mysql://hostname:3306/db1"
val url2="jdbc:mysql://hostname:3306/db2"

val user1 ="usr1"
val user2="usr2"
val pwd = "password"

val Writer1 = new JDBCSink1(url1,user1, pwd)

val Writer2 = new JDBCSink2(url2,user2, pwd)


val query2 =
  streamDF2
    .writeStream
    .foreach(Writer2)
    .outputMode("append")
    .trigger(ProcessingTime("35 seconds"))
    .start().awaitTermination()



val query1 =
  streamDF1
    .writeStream
    .foreach(Writer1)
    .outputMode("append")
    .trigger(ProcessingTime("30 seconds"))
    .start().awaitTermination()

推荐答案

由于 awaitTermination ,您正在阻止第二个查询.如果要有两个输出流,则需要在等待它们终止之前先启动两个流:

You are blocking the second query because of the awaitTermination. If you want to have two output streams you need to start both before waiting for their termination:

val query2 =
  streamDF2
    .writeStream
    .foreach(Writer2)
    .outputMode("append")
    .trigger(ProcessingTime("35 seconds"))
    .start()

val query1 =
  streamDF1
    .writeStream
    .foreach(Writer1)
    .outputMode("append")
    .trigger(ProcessingTime("30 seconds"))
    .start()

query1.awaitTermination()
query2.awaitTermination()


Spark还允许您按照

Spark also allows you to schedule and allocate resources to the different streaming queries as described in Scheduling within an application. You can configure your pools based on

  • schedulingMode :可以是 FIFO FAIR
  • 权重:这控制着该池相对于其他池在群集中所占的份额.默认情况下,所有池的权重均为1.例如,如果给特定池赋予权重2,则它将获得比其他活动池多2倍的资源."
  • minShare :除了总重量之外,还可以为每个池分配管理员希望拥有的最小份额(作为CPU核心的数量)."
  • schedulingMode: can be FIFO or FAIR
  • weight: "This controls the pool’s share of the cluster relative to other pools. By default, all pools have a weight of 1. If you give a specific pool a weight of 2, for example, it will get 2x more resources as other active pools."
  • minShare: "Apart from an overall weight, each pool can be given a minimum shares (as a number of CPU cores) that the administrator would like it to have."

可以通过创建类似于 conf/fairscheduler.xml.template 的XML文件并在类路径上放置一个名为fairscheduler.xml的文件或设置来设置池配置.您的SparkConf中的spark.scheduler.allocation.file 属性.

The pool configurations can be set by creating an XML file, similar to conf/fairscheduler.xml.template, and either putting a file named fairscheduler.xml on the classpath, or setting spark.scheduler.allocation.file property in your SparkConf.

conf.set("spark.scheduler.allocation.file", "/path/to/file")

可以通过以下方式应用不同的池:

Applying different pool can be done like below:

spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")

// In the above example you could then tell Spark to make use of the pools
val query1 = streamDF1.writeStream.[...].start(pool1)
val query2 = streamDF2.writeStream.[...].start(pool2)

这篇关于如何在Spark结构化流中将两个流df写入MySQL的两个不同表中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆