如何使用sparkstreaming读取文件并使用Scala写入简单文件? [英] How to read a file using sparkstreaming and write to a simple file using Scala?

查看:66
本文介绍了如何使用sparkstreaming读取文件并使用Scala写入简单文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Scala SparkStreaming程序读取文件.该文件存储在本地计算机上的目录中,并尝试将其作为新文件写入本地计算机本身.但是,每当我编写流并将其存储为实木复合地板时,我最终都会得到空白文件夹.

I'm trying to read a file using a scala SparkStreaming program. The file is stored in a directory on my local machine and trying to write it as a new file on my local machine itself. But whenever I write my stream and store it as parquet I end up getting blank folders.

这是我的代码:

 Logger.getLogger("org").setLevel(Level.ERROR)
 val spark = SparkSession
             .builder()
             .master("local[*]")
             .appName("StreamAFile")
             .config("spark.sql.warehouse.dir", "file:///C:/temp")
             .getOrCreate()
 
         
 import spark.implicits._            
 val schemaforfile = new StructType().add("SrNo",IntegerType).add("Name",StringType).add("Age",IntegerType).add("Friends",IntegerType)
             
 val file = spark.readStream.schema(schemaforfile).csv("C:\\SparkScala\\fakefriends.csv")  

 file.writeStream.format("parquet").start("C:\\Users\\roswal01\\Desktop\\streamed") 
 
 spark.stop()
 

我的代码中是否缺少任何内容,或者我出了错的代码中有任何内容?

Is there anything missing in my code or anything in the code where I've gone wrong?

我还尝试从hdfs位置读取此文件,但最终相同的代码未在我的hdfs上创建任何输出文件夹.

I also tried reading this file from a hdfs location but the same code ends up not creating any output folders on my hdfs.

推荐答案

您在这里犯了错误:

val file = spark.readStream.schema(schemaforfile).csv("C:\\SparkScala\\fakefriends.csv")  

csv()函数应将目录路径作为参数.当新文件移入该目录时,它将扫描该目录并读取所有新文件

csv() function should have directory path as an argument. It will scan this directory and read all new files when they will be moved into this directory

要添加检查点,您应该添加

For checkpointing, you should add

.option("checkpointLocation", "path/to/HDFS/dir")

例如:

val query = file.writeStream.format("parquet")
    .option("checkpointLocation", "path/to/HDFS/dir")
    .start("C:\\Users\\roswal01\\Desktop\\streamed") 

query.awaitTermination()

这篇关于如何使用sparkstreaming读取文件并使用Scala写入简单文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆