关于 Spark Dataframereader jdbc [英] Regarding Spark Dataframereader jdbc

查看:30
本文介绍了关于 Spark Dataframereader jdbc的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于 Spark Dataframereader 机制的问题.如果有人可以帮助我,我将不胜感激.让我在这里解释一下场景

I have a question regarding Mechanics of Spark Dataframereader. I will appreciate if anybody can help me. Let me explain the Scenario here

我正在像这样从 Dstream 创建一个 DataFrame.这在输入数据

I am creating a DataFrame from Dstream like this. This in Input Data

 var config = new HashMap[String,String]();
        config += ("zookeeper.connect" ->zookeeper);        
        config += ("partition.assignment.strategy" ->"roundrobin");
        config += ("bootstrap.servers" ->broker);
        config += ("serializer.class" -> "kafka.serializer.DefaultEncoder");
        config += ("group.id" -> "default"); 

        val lines =  KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,config.toMap,Set(topic)).map(_._2)

        lines.foreachRDD { rdd =>

                if(!rdd.isEmpty()){

                    val rddJson = rdd.map { x => MyFunctions.mapToJson(x) }       
                    



                           
                    val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)

                    val rddDF = sqlContext.read.json(rddJson)

                    rddDF.registerTempTable("inputData")
            
                   

 val dbDF = ReadDataFrameHelper.readDataFrameHelperFromDB(sqlContext, jdbcUrl, "ABCD","A",numOfPartiton,lowerBound,upperBound)

这里是ReadDataFrameHelper的代码

Here is the code of ReadDataFrameHelper

def readDataFrameHelperFromDB(sqlContext:HiveContext,jdbcUrl:String,dbTableOrQuery:String,
            columnToPartition:String,numOfPartiton:Int,lowerBound:Int,highBound:Int):DataFrame={

        val jdbcDF = sqlContext.read.jdbc(url = jdbcUrl, table = dbTableOrQuery,
                columnName = columnToPartition,
                lowerBound = lowerBound,
                upperBound = highBound,
                numPartitions = numOfPartiton,
                connectionProperties = new java.util.Properties()
                )
                
            jdbcDF  

    }

最后我正在做这样的加入

Lastly I am doing a Join like this

 val joinedData = rddDF.join(dbDF,rddDF("ID") === dbDF("ID")
                                 && rddDF("CODE") === dbDF("CODE"),"left_outer")
                        .drop(dbDF("code"))
                        .drop(dbDF("id"))
                        .drop(dbDF("number"))
                        .drop(dbDF("key"))
                        .drop(dbDF("loaddate"))
                        .drop(dbDF("fid"))
joinedData.show()

我的输入 DStream 将有 1000 行,数据将包含数百万行.因此,当我执行此连接时,将触发从数据库中加载所有行并读取这些行,或者仅从 DB 中读取那些具有来自输入 DStream

My input DStream will have 1000 rows and data will contains million of rows. So when I do this join, will spark load all the rows from database and read those rows or will this just read those specific rows from DB which have the code,id from the input DStream

推荐答案

正如 zero323 所指定的,我还确认将从表中完整读取数据.我检查了数据库会话日志,看到整个数据集正在加载.

As specified by zero323, i have also confirmed that data will be read full from the table. I checked the DB session logs and saw that whole dataset is getting loaded.

感谢 zero323

这篇关于关于 Spark Dataframereader jdbc的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆