如何通过使用Spark结构化流连续监视目录 [英] How to continuously monitor a directory by using Spark Structured Streaming

查看:108
本文介绍了如何通过使用Spark结构化流连续监视目录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望spark持续监视目录并在文件出现在目录中后立即使用spark.readStream读取CSV文件.

I want spark to continuously monitor a directory and read the CSV files by using spark.readStream as soon as the file appears in that directory.

请不要提供Spark Streaming解决方案.我正在寻找一种使用Spark结构化流媒体的方法.

Please don't include a solution of Spark Streaming. I am looking for a way to do it by using spark structured streaming.

推荐答案

以下是此用例的完整解决方案:

Here is the complete Solution for this use Case:

如果以独立模式运行.您可以通过以下方式增加驱动程序内存:

If you are running in stand alone mode. You can increase the driver memory as:

bin/spark-shell --driver-memory 4G

不需要像在独立模式下那样设置执行程序内存,执行程序在驱动程序中运行.

No need to set the executor memory as in Stand Alone mode executor runs within the Driver.

正在完成@ T.Gaweda的解决方案,请找到以下解决方案:

As Completing the solution of @T.Gaweda, find the solution below:

val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")

csvDf.writeStream.format("console").option("truncate","false").start()

现在,spark将连续监视指定目录,并且一旦在目录中添加任何csv文件,您的DataFrame操作"csvDF"就会在该文件上执行.

now the spark will continuously monitor the specified directory and as soon as you add any csv file in the directory your DataFrame operation "csvDF" will be executed on that file.

注意:如果要让火花推断出模式,则必须首先设置以下配置:

Note: If you want spark to inferschema you have to first set the following configuration:

spark.sqlContext.setConf("spark.sql.streaming.schemaInferenc‌​e","true")

火花是您的火花会话.

这篇关于如何通过使用Spark结构化流连续监视目录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆