关于Spark Dataframereader jdbc [英] Regarding Spark Dataframereader jdbc

查看:362
本文介绍了关于Spark Dataframereader jdbc的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对Spark Dataframereader的力学存在疑问.如果有人可以帮助我,我将不胜感激.让我在这里解释场景

I have a question regarding Mechanics of Spark Dataframereader. I will appreciate if anybody can help me. Let me explain the Scenario here

我正在像这样从Dstream创建一个DataFrame.在输入数据中

I am creating a DataFrame from Dstream like this. This in Input Data

 var config = new HashMap[String,String]();
        config += ("zookeeper.connect" ->zookeeper);        
        config += ("partition.assignment.strategy" ->"roundrobin");
        config += ("bootstrap.servers" ->broker);
        config += ("serializer.class" -> "kafka.serializer.DefaultEncoder");
        config += ("group.id" -> "default"); 

        val lines =  KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,config.toMap,Set(topic)).map(_._2)

        lines.foreachRDD { rdd =>

                if(!rdd.isEmpty()){

                    val rddJson = rdd.map { x => MyFunctions.mapToJson(x) }       





                    val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)

                    val rddDF = sqlContext.read.json(rddJson)

                    rddDF.registerTempTable("inputData")



 val dbDF = ReadDataFrameHelper.readDataFrameHelperFromDB(sqlContext, jdbcUrl, "ABCD","A",numOfPartiton,lowerBound,upperBound)

这是ReadDataFrameHelper的代码

Here is the code of ReadDataFrameHelper

def readDataFrameHelperFromDB(sqlContext:HiveContext,jdbcUrl:String,dbTableOrQuery:String,
            columnToPartition:String,numOfPartiton:Int,lowerBound:Int,highBound:Int):DataFrame={

        val jdbcDF = sqlContext.read.jdbc(url = jdbcUrl, table = dbTableOrQuery,
                columnName = columnToPartition,
                lowerBound = lowerBound,
                upperBound = highBound,
                numPartitions = numOfPartiton,
                connectionProperties = new java.util.Properties()
                )

            jdbcDF  

    }

最后,我正在像这样进行加入

Lastly i am doing a Join like this

 val joinedData = rddDF.join(dbDF,rddDF("ID") === dbDF("ID")
                                 && rddDF("CODE") === dbDF("CODE"),"left_outer")
                        .drop(dbDF("code"))
                        .drop(dbDF("id"))
                        .drop(dbDF("number"))
                        .drop(dbDF("key"))
                        .drop(dbDF("loaddate"))
                        .drop(dbDF("fid"))
joinedData.show()

我的输入DStream将具有1000行,而数据将包含一百万行.因此,当我执行此连接时,将触发从数据库加载所有行并读取这些行,或者仅从DB中读取那些具有输入DStream中的code,id的特定行

My input DStream will have 1000 rows and data will contains million of rows. So when i do this join, will spark load all the rows from database and read those rows or will this just read the those specific rows from DB which have the code,id from the input DStream

推荐答案

如zero323所指定,我还确认将从表中读取全部数据.我检查了数据库会话日志,发现整个数据集都已加载.

As specified by zero323, i have also confirmed that data will be read full from the table. I checked the DB session logs and saw that whole dataset is getting loaded.

感谢zero323

这篇关于关于Spark Dataframereader jdbc的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆