如何将火花结构化流数据帧插入 Hive 外部表/位置? [英] How to insert spark structured streaming DataFrame to Hive external table/location?

查看:16
本文介绍了如何将火花结构化流数据帧插入 Hive 外部表/位置?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于 Spark 结构化流与 HIVE 表集成的一个查询.

One query on spark structured streaming integration with HIVE table.

我尝试做一些火花结构化流的例子.

I have tried to do some examples of spark structured streaming.

这是我的例子

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")

 query.writeStream.start()

正如您在将数据帧写入 hdfs 位置时的最后一步中看到的那样,数据没有插入到令人兴奋的目录中(我现有的目录有一些按年龄"分区的旧数据).

As you can see in the last step while writing data-frame to hdfs location, , the data is not getting inserted into the exciting directory (my existing directory having some old data partitioned by "age").

我要了

spark.sql.AnalysisException : 必须使用 writeStream start() 执行带有流源的查询

spark.sql.AnalysisException : queries with streaming source must be executed with writeStream start()

你能帮助我为什么我无法将数据插入到 hdfs 位置的现有目录中吗?或者有没有其他方法可以在 hive 表上进行插入"操作?

Can you help why i am not able to insert data in to existing directory in hdfs location ? or is there any other way that i can do "insert into " operation on hive table ?

寻找解决方案

推荐答案

Spark Structured Streaming 不支持将流式查询的结果写入 Hive 表.

Spark Structured Streaming does not support writing the result of a streaming query to a Hive table.

scala> println(spark.version)
2.4.0

val sq = spark.readStream.format("rate").load
scala> :type sq
org.apache.spark.sql.DataFrame

scala> assert(sq.isStreaming)

scala> sq.writeStream.format("hive").start
org.apache.spark.sql.AnalysisException: Hive data source can only be used with tables, you can not write files of Hive data source directly.;
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:246)
  ... 49 elided

如果不支持目标系统(又名 sink),您可以使用 foreach 和 foreachBatch 操作(突出显示我的):

If a target system (aka sink) is not supported you could use use foreach and foreachBatch operations (highlighting mine):

foreachforeachBatch 操作允许您对流式查询的输出应用任意操作和编写逻辑.它们的用例略有不同 - foreach 允许在每一行上自定义写入逻辑,foreachBatch 允许对每个微批次的输出进行任意操作和自定义逻辑.

The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

我认为 foreachBatch 是您最好的选择.

I think foreachBatch is your best bet.

import org.apache.spark.sql.DataFrame
sq.writeStream.foreachBatch { case (ds: DataFrame, batchId: Long) =>
  // do whatever you want with your input DataFrame
  // incl. writing to Hive
  // I simply decided to print out the rows to the console
  ds.show
}.start

<小时>

还有 Apache Hive 仓库连接器 我从来没有用过,但似乎它可能会有所帮助.


There is also Apache Hive Warehouse Connector that I've never worked with but seems like it may be of some help.

这篇关于如何将火花结构化流数据帧插入 Hive 外部表/位置?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆