关于 Spark Dataframereader jdbc [英] Regarding Spark Dataframereader jdbc
问题描述
我有一个关于 Spark Dataframereader 机制的问题.如果有人可以帮助我,我将不胜感激.让我在这里解释一下场景
I have a question regarding Mechanics of Spark Dataframereader. I will appreciate if anybody can help me. Let me explain the Scenario here
我正在像这样从 Dstream 创建一个 DataFrame.这在输入数据
I am creating a DataFrame from Dstream like this. This in Input Data
var config = new HashMap[String,String]();
config += ("zookeeper.connect" ->zookeeper);
config += ("partition.assignment.strategy" ->"roundrobin");
config += ("bootstrap.servers" ->broker);
config += ("serializer.class" -> "kafka.serializer.DefaultEncoder");
config += ("group.id" -> "default");
val lines = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc,config.toMap,Set(topic)).map(_._2)
lines.foreachRDD { rdd =>
if(!rdd.isEmpty()){
val rddJson = rdd.map { x => MyFunctions.mapToJson(x) }
val sqlContext = SQLContextSingleton.getInstance(ssc.sparkContext)
val rddDF = sqlContext.read.json(rddJson)
rddDF.registerTempTable("inputData")
val dbDF = ReadDataFrameHelper.readDataFrameHelperFromDB(sqlContext, jdbcUrl, "ABCD","A",numOfPartiton,lowerBound,upperBound)
这里是ReadDataFrameHelper的代码
Here is the code of ReadDataFrameHelper
def readDataFrameHelperFromDB(sqlContext:HiveContext,jdbcUrl:String,dbTableOrQuery:String,
columnToPartition:String,numOfPartiton:Int,lowerBound:Int,highBound:Int):DataFrame={
val jdbcDF = sqlContext.read.jdbc(url = jdbcUrl, table = dbTableOrQuery,
columnName = columnToPartition,
lowerBound = lowerBound,
upperBound = highBound,
numPartitions = numOfPartiton,
connectionProperties = new java.util.Properties()
)
jdbcDF
}
最后我正在做这样的加入
Lastly I am doing a Join like this
val joinedData = rddDF.join(dbDF,rddDF("ID") === dbDF("ID")
&& rddDF("CODE") === dbDF("CODE"),"left_outer")
.drop(dbDF("code"))
.drop(dbDF("id"))
.drop(dbDF("number"))
.drop(dbDF("key"))
.drop(dbDF("loaddate"))
.drop(dbDF("fid"))
joinedData.show()
我的输入 DStream 将有 1000 行,数据将包含数百万行.因此,当我执行此连接时,将触发从数据库中加载所有行并读取这些行,或者仅从 DB 中读取那些具有来自输入 DStream
My input DStream will have 1000 rows and data will contains million of rows. So when I do this join, will spark load all the rows from database and read those rows or will this just read those specific rows from DB which have the code,id
from the input DStream
推荐答案
正如 zero323 所指定的,我还确认将从表中完整读取数据.我检查了数据库会话日志,看到整个数据集正在加载.
As specified by zero323, i have also confirmed that data will be read full from the table. I checked the DB session logs and saw that whole dataset is getting loaded.
感谢 zero323
这篇关于关于 Spark Dataframereader jdbc的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!