如何将Spark结构化流式DataFrame插入Hive外部表/位置? [英] How to insert spark structured streaming DataFrame to Hive external table/location?

查看:141
本文介绍了如何将Spark结构化流式DataFrame插入Hive外部表/位置?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于Spark结构与HIVE表的流集成的一个查询.

One query on spark structured streaming integration with HIVE table.

我试图做一些火花结构化流媒体的例子.

I have tried to do some examples of spark structured streaming.

这是我的例子

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")

 query.writeStream.start()

在将数据帧写入hdfs位置的最后一步中,您可以看到,数据没有插入到令人兴奋的目录中(我现有的目录中有一些按"age"划分的旧数据).

As you can see in the last step while writing data-frame to hdfs location, , the data is not getting inserted into the exciting directory (my existing directory having some old data partitioned by "age").

我得到了

spark.sql.AnalysisException:必须使用writeStream start()执行带有流源的查询

spark.sql.AnalysisException : queries with streaming source must be executed with writeStream start()

您能帮我为什么不能在hdfs位置的现有目录中插入数据吗?还是我可以在蜂巢表上执行插入"操作的任何其他方式?

Can you help why i am not able to insert data in to existing directory in hdfs location ? or is there any other way that i can do "insert into " operation on hive table ?

寻找解决方案

推荐答案

Spark结构化流不支持将流查询的结果写入Hive表.

Spark Structured Streaming does not support writing the result of a streaming query to a Hive table.

scala> println(spark.version)
2.4.0

val sq = spark.readStream.format("rate").load
scala> :type sq
org.apache.spark.sql.DataFrame

scala> assert(sq.isStreaming)

scala> sq.writeStream.format("hive").start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:246)
  ... 49 elided

如果不支持目标系统(又名接收器),则可以使用

If a target system (aka sink) is not supported you could use use foreach and foreachBatch operations (highlighting mine):

foreachforeachBatch操作允许您在流查询的输出上应用任意操作并编写逻辑.它们的用例略有不同-foreach允许在每一行上执行自定义写逻辑,而foreachBatch允许在每个微批处理的输出上进行任意操作和自定义逻辑.

The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

我认为foreachBatch是您最好的选择.

I think foreachBatch is your best bet.

import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
  // do whatever you want with your input DataFrame
  // incl. writing to Hive
  // I simply decided to print out the rows to the console
  ds.show
}.start


还有 Apache Hive Warehouse连接器我从未合作过,但似乎可能有所帮助.


There is also Apache Hive Warehouse Connector that I've never worked with but seems like it may be of some help.

这篇关于如何将Spark结构化流式DataFrame插入Hive外部表/位置?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆