Spark Streaming在Scala中使用foreachRDD()将数据保存到MySQL [英] Spark Streaming Saving data to MySQL with foreachRDD() in Scala
问题描述
Spark Streaming在Scala中使用foreachRDD()
将数据保存到MySQL
Spark Streaming Saving data to MySQL with foreachRDD()
in Scala
请,有人可以给我一个有关在Scala中使用foreachRDD()
将Spark Streaming保存到MySQL DB的功能示例.我有下面的代码,但不能正常工作.我只需要一个简单的例子,而不是简单的例子或理论.
Please, can somebody give me a functional example about saving an Spark Streaming to MySQL DB using foreachRDD()
in Scala. I have below code but it's not working. I just need a simple example, not sintaxis or theory.
谢谢!
package examples
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import java.util.Properties
import org.apache.spark.sql.SaveMode
object StreamingToMysql {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
val hiveCtx= new HiveContext(sc)
import hiveCtx.implicits._
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
ssc.checkpoint("hdfs://localhost:54310/user/hduser/Streaming/logs")
val rdd = sc.parallelize(List(1))
val df = rdd.toDF()
val split = lines.map(line => line.split(",") )
val input = split.map(x => x(0))
input.foreachRDD { rdd =>
if (rdd.take (1).size == 1) {
rdd.foreachPartition { iterator =>
iterator.foreach {
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "admin123")
iterator.write.mode("append")
.jdbc("jdbc:mysql://192.168.100.8:3306/hadoopguide", "topics", connectionProperties)
}
}
}
}
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "admin123")
df.write.mode("append")
.jdbc("jdbc:mysql://192.168.100.8:3306/hadoopguide", "topics", connectionProperties)
println("Done")
ssc.start()
ssc.awaitTermination()
}
}
推荐答案
要将数据从Spark Streaming写入外部系统,可以使用高级数据帧API或低级RDD.在上面的代码中,这两种方法都是混合使用的,并且可以起作用.
To write data from Spark Streaming to an external system, you can use the high-level dataframes API or the low-level RDD. In the code above, both approaches are mixed and do work.
假设您知道Spark Streaming中传入数据的结构,则可以从每个RDD中创建一个数据框,并使用Dataframe API进行保存:
Assuming that you know the structure of the incoming data in Spark Streaming, you can create a Dataframe out of each RDD and use the Dataframe API to save it:
首先,为数据创建一个架构:
First, create a schema for the data:
case class MyStructure(field: Type,....)
然后,将架构应用于传入流:
then, apply the schema to the incoming stream:
val structuredData = dstream.map(record => MyStructure(record.field1, ... record.fieldn))
现在使用foreachRDD
将DStream中的每个RDD转换为一个数据帧,并使用DF API将其保存:
Now use the foreachRDD
to transform each RDD in the DStream into a Dataframe and use the DF API to save it:
// JDBC writer configuration
val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "*****")
structuredData.foreachRDD { rdd =>
val df = rdd.toDF() // create a dataframe from the schema RDD
df.write.mode("append")
.jdbc("jdbc:mysql://192.168.100.8:3306/hadoopguide", "topics", connectionProperties)
}
这篇关于Spark Streaming在Scala中使用foreachRDD()将数据保存到MySQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!