关于Spark Dataframereader jdbc [英] Regarding Spark Dataframereader jdbc
问题描述
我对Spark Dataframereader的力学存在疑问.如果有人可以帮助我,我将不胜感激.让我在这里解释场景
I have a question regarding Mechanics of Spark Dataframereader. I will appreciate if anybody can help me. Let me explain the Scenario here
我正在像这样从Dstream创建一个DataFrame.在输入数据中
I am creating a DataFrame from Dstream like this. This in Input Data
var config = new HashMap[String,String]();
config += ("zookeeper.connect" ->zookeeper);
config += ("partition.assignment.strategy" ->"roundrobin");
config += ("bootstrap.servers" ->broker);
config += ("serializer.class" -> "kafka.serializer.DefaultEncoder");
config += ("group.id" -> "default");
val lines = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,config.toMap,Set(topic)).map(_._2)
lines.foreachRDD { rdd =>
if(!rdd.isEmpty()){
val rddJson = rdd.map { x => MyFunctions.mapToJson(x) }
val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)
val rddDF = sqlContext.read.json(rddJson)
rddDF.registerTempTable("inputData")
val dbDF = ReadDataFrameHelper.readDataFrameHelperFromDB(sqlContext, jdbcUrl, "ABCD","A",numOfPartiton,lowerBound,upperBound)
这是ReadDataFrameHelper的代码
Here is the code of ReadDataFrameHelper
def readDataFrameHelperFromDB(sqlContext:HiveContext,jdbcUrl:String,dbTableOrQuery:String,
columnToPartition:String,numOfPartiton:Int,lowerBound:Int,highBound:Int):DataFrame={
val jdbcDF = sqlContext.read.jdbc(url = jdbcUrl, table = dbTableOrQuery,
columnName = columnToPartition,
lowerBound = lowerBound,
upperBound = highBound,
numPartitions = numOfPartiton,
connectionProperties = new java.util.Properties()
)
jdbcDF
}
最后,我正在像这样进行加入
Lastly i am doing a Join like this
val joinedData = rddDF.join(dbDF,rddDF("ID") === dbDF("ID")
&& rddDF("CODE") === dbDF("CODE"),"left_outer")
.drop(dbDF("code"))
.drop(dbDF("id"))
.drop(dbDF("number"))
.drop(dbDF("key"))
.drop(dbDF("loaddate"))
.drop(dbDF("fid"))
joinedData.show()
我的输入DStream将具有1000行,而数据将包含一百万行.因此,当我执行此连接时,将触发从数据库加载所有行并读取这些行,或者仅从DB中读取那些具有输入DStream中的code,id
的特定行
My input DStream will have 1000 rows and data will contains million of rows. So when i do this join, will spark load all the rows from database and read those rows or will this just read the those specific rows from DB which have the code,id
from the input DStream
推荐答案
如zero323所指定,我还确认将从表中读取全部数据.我检查了数据库会话日志,发现整个数据集都已加载.
As specified by zero323, i have also confirmed that data will be read full from the table. I checked the DB session logs and saw that whole dataset is getting loaded.
感谢zero323
这篇关于关于Spark Dataframereader jdbc的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!