如何在Spark结构化流中使用foreach方法将数据插入HIVE [英] how to insert data to HIVE using foreach method in spark structured streaming
问题描述
我尝试使用foreach方法将数据插入到HIVE表中.
I try inserting data to HIVE table using foreach method.
我使用spark 2.3.0.
I use spark 2.3.0.
这是我的代码
df_drop_window.writeStream
.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(value: Row): Unit = {
println(s">> Processing ${value}")
// how to onvert the value as dataframe ?
}
override def close(errorOrNull: Throwable): Unit = {
}
}).outputMode("update").start()
正如您在上面看到的,我想将值"转换为数据框,然后将数据插入到HIVE表中,就像插入表名(从数据框中选择*)一样.有人可以帮忙吗?是火花串流的新手法
As you can see above, I want convert the "value" to dataframe and insert data to HIVE table like insert into tablename(select * from dataframe). can someone help how to do it ?am new to spark streaming
我只能看到以下可用选项.可以说我如何将value:Row转换为data 框架?
I can see only following option available. can some say how can i convert value:Row to dataframe ?
我尝试了以下操作,但出现错误(org.apache.spark.SparkException:任务无法序列化)
I have tried following but am getting error (org.apache.spark.SparkException: Task not serializable)
df.writeStream
.foreach(new ForeachWriter[Row]() {
override def open(partitionId: Long, epochId: Long): Boolean = true
override def process(value: Row): Unit = {
val rowsRdd = sc.parallelize(Seq(value))
val df2 = spark.createDataFrame(rowsRdd, schema)
df2.createOrReplaceTempView("testing2")
spark.sql("insert into table are.table_name1 Partition(date) select * from testing2")
}
override def close(errorOrNull: Throwable): Unit = {
}
}).outputMode("append").start()
推荐答案
Spark会话在执行者端不可序列化,您需要广播Spark会话
Spark Session is not serializable on the executor side, you need to broadcast spark session
这篇关于如何在Spark结构化流中使用foreach方法将数据插入HIVE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!