如何将Spark结构化流式DataFrame插入Hive外部表/位置? [英] How to insert spark structured streaming DataFrame to Hive external table/location?
问题描述
关于Spark结构与HIVE表的流集成的一个查询.
One query on spark structured streaming integration with HIVE table.
我试图做一些火花结构化流媒体的例子.
I have tried to do some examples of spark structured streaming.
这是我的例子
val spark =SparkSession.builder().appName("StatsAnalyzer")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
.getOrCreate()
// Register the dataframe as a Hive table
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta")
csvDF.createOrReplaceTempView("updates")
val query= spark.sql("insert into table_abcd select * from updates")
query.writeStream.start()
在将数据帧写入hdfs位置的最后一步中,您可以看到,数据没有插入到令人兴奋的目录中(我现有的目录中有一些按"age"划分的旧数据).
As you can see in the last step while writing data-frame to hdfs location, , the data is not getting inserted into the exciting directory (my existing directory having some old data partitioned by "age").
我得到了
spark.sql.AnalysisException:必须使用writeStream start()执行带有流源的查询
spark.sql.AnalysisException : queries with streaming source must be executed with writeStream start()
您能帮我为什么不能在hdfs位置的现有目录中插入数据吗?还是我可以在蜂巢表上执行插入"操作的任何其他方式?
Can you help why i am not able to insert data in to existing directory in hdfs location ? or is there any other way that i can do "insert into " operation on hive table ?
寻找解决方案
推荐答案
Spark结构化流不支持将流查询的结果写入Hive表.
Spark Structured Streaming does not support writing the result of a streaming query to a Hive table.
scala> println(spark.version)
2.4.0
val sq = spark.readStream.format("rate").load
scala> :type sq
org.apache.spark.sql.DataFrame
scala> assert(sq.isStreaming)
scala> sq.writeStream.format("hive").start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:246)
... 49 elided
If a target system (aka sink) is not supported you could use use foreach and foreachBatch operations (highlighting mine):
foreach
和foreachBatch
操作允许您在流查询的输出上应用任意操作并编写逻辑.它们的用例略有不同-foreach
允许在每一行上执行自定义写逻辑,而foreachBatch
允许在每个微批处理的输出上进行任意操作和自定义逻辑.
The
foreach
andforeachBatch
operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - whileforeach
allows custom write logic on every row,foreachBatch
allows arbitrary operations and custom logic on the output of each micro-batch.
我认为foreachBatch
是您最好的选择.
I think foreachBatch
is your best bet.
import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
// do whatever you want with your input DataFrame
// incl. writing to Hive
// I simply decided to print out the rows to the console
ds.show
}.start
还有 Apache Hive Warehouse连接器我从未合作过,但似乎可能有所帮助.
There is also Apache Hive Warehouse Connector that I've never worked with but seems like it may be of some help.
这篇关于如何将Spark结构化流式DataFrame插入Hive外部表/位置?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!