从HDFS读取时,Spark结构化流不写入数据 [英] Spark Structured Streaming not writing data while reading from HDFS

查看:45
本文介绍了从HDFS读取时,Spark结构化流不写入数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究一种流脚本,该脚本应在文件落入HDFS时立即将其拾取,聚合并将它们写入其他位置.

I am working on a streaming script which should pickup files as soon as they land on HDFS, aggregate them and write them somewhere else.

在这里,我无法进行写操作-它创建了元数据文件夹,但没有实际写操作发生.在10多个文件(结构相同)中,只有一个被写入,我不确定为什么

Here, I cannot get the write to work - it creates the metadata folder, but no actual writing happens. Of 10+ files (all the same structure), only one was written and I am not sure why

有人可以帮我吗?

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import udf, input_file_name, lower
from pyspark.streaming import StreamingContext
import sys
reload(sys)

sys.setdefaultencoding('utf-8')

now = datetime.now()

#create a contexit that supports hive
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session

### START MAIN ###
if __name__ == '__main__':
    spark_session = create_session('streaming_monitor')
    ssc = StreamingContext(spark_session, 1)
    print('start')
    print(datetime.now())

    myschema = StructType([
      StructField('text', StringType())
    ])

    #only files after stream starts
    df = spark_session\
        .readStream\
        .option('newFilesOnly', 'true')\
        .option('header', 'true')\
        .schema(myschema)\
        .text('hdfs://nameservice/user/user1/streamtest/')\
        .withColumn("FileName", input_file_name())

    output = df.createOrReplaceTempView('log')
    #hive_dump = spark_session.sql("select '" + str(now) + "' as timestamp, FileName, did_it_error, solution, text from log")

    output = df\
    .writeStream\
    .format("csv")\
    .queryName('logsmonitor')\
    .option("checkpointLocation", "file:///home/user1/analytics/logs/chkpoint_dir")\
    .start('hdfs://nameservice/user/user1/streamtest/output/')\
    .awaitTermination()

推荐答案

您在这里观察到的是,必须将Spark Streaming读取的文件原子地放入源文件夹.否则,该文件将在创建后立即读取(并且不包含任何内容).Spark不会对文件中的更新数据起作用,而只会对文件进行一次查看.

What you are observing here is that files read by Spark Streaming have to be placed into the source folder atomically. Otherwise, the file will be read as soon as it was created (and without having any content). Spark will not act on updated data within a file but rather looks at a file exactly once.

如果您会看到所有数据浮动

You will see all your data floating if you

  • 停止流式传输工作
  • 删除检查点目录(或将所有输入文件重命名为新的唯一名称)
  • 将所有文件移至源文件夹
  • 等待完成
  • 启动流式应用程序

当然,如果要让此作业连续运行并添加越来越多的文件,这将不是解决方案,但真正的秘密在于将文件 atomically 一次放入文件夹中

Of course, this will not be a solution if you want to let this job run continuously and adding more and more files, but the secret is really in placing the files atomically at once into the folder.

我对HDFS并不完全熟悉,但是通常可以通过将数据写入另一个文件夹然后将其移动到源文件夹中来实现这种原子性.

I am not completely familiar with HDFS but usually this atomicity can be achieved by writing the data into another folder and then moving it into the source folder.

此处是输入源:

" <文件源-读取写入目录的文件作为数据流.文件将按照文件修改时间的顺序进行处理.如果设置了laststFirst,则顺序将相反.支持的文件格式为文本,CSV,JSON,ORC,Parquet.有关最新列表以及每种文件格式支持的选项,请参见DataStreamReader界面的文档.请注意,文件必须原子放置在给定目录中,在大多数文件系统中,这可以通过文件移动操作来实现."

"File source - Reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If latestFirst is set, order will be reversed. Supported file formats are text, CSV, JSON, ORC, Parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations."

这篇关于从HDFS读取时,Spark结构化流不写入数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆