如何以控制台格式打印结构化流 [英] How to print out Structured Stream in Console format

查看:92
本文介绍了如何以控制台格式打印结构化流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习使用Databricks的结构化流,并且正在使用DataStreamWriter控制台模式.

I am learning Structured Streaming with Databricks and I'm struggling with the DataStreamWriter console mode.

我的程序:

  • 模拟文件流式传输到达文件夹"monitoring_dir"(每10秒从"source_dir"传输一个新文件).
  • 使用DataStreamReader用每个新文件的内容填充Unbounded DataFrame"inputUDF".
  • 使用DataStreamWriter将"inputUDF"的新行输出到有效的接收器.

尽管该程序在选择使用文件接收器时有效(批处理被附加到"result_dir"中的文本格式文件中),但是选择控制台接收器"时看不到显示的任何内容.

Whereas the program works when choosing to use a File sink (the batches are appended to text-format files in "result_dir"), I cannot see anything displayed when choosing Console sink.

此外,当我在本地计算机上运行该程序的等效版本(安装了Spark)时,它对于文件接收器和控制台接收器都可以正常工作.

Moreover, when I run the equivalent version of the program in my local machine (with Spark installed on it) it works fine both for File and Console sinks.

我的问题是:

  • 使用Databricks时,如何使该程序输出到控制台接收器并显示结果?

非常感谢您!

最诚挚的问候,玉米片

import pyspark
import pyspark.sql.functions

import time

#------------------------------------
# FUNCTION get_source_dir_file_names
#------------------------------------ 
def get_source_dir_file_names(source_dir):

    # 1. We create the output variable
    res = []

    # 2. We get the FileInfo representation of the files of source_dir
    fileInfo_objects = dbutils.fs.ls(source_dir)

    # 3. We traverse the fileInfo objects, to get the name of each file
    for item in fileInfo_objects:      
        # 3.1. We get a string representation of the fileInfo
        file_name = str(item)

        # 3.2. We look for the pattern name= to remove all useless info from the start
        lb_index = file_name.index("name='")
        file_name = file_name[(lb_index + 6):]

        # 3.3. We look for the pattern ') to remove all useless info from the end
        ub_index = file_name.index("',")
        file_name = file_name[:ub_index]

        # 3.4. We append the name to the list
        res.append(file_name)

    # 4. We sort the list in alphabetic order
    res.sort()

    # 5. We return res
    return res

#------------------------------------
# FUNCTION streaming_simulation
#------------------------------------ 
def streaming_simulation(source_dir, monitoring_dir, time_step_interval):
    # 1. We get the names of the files on source_dir
    files = get_source_dir_file_names(source_dir)

    # 2. We get the starting time of the process
    time.sleep(time_step_interval * 0.1)

    start = time.time()

    # 3. We set a counter in the amount of files being transferred
    count = 0

    # 4. We simulate the dynamic arriving of such these files from source_dir to dataset_dir
    # (i.e, the files are moved one by one for each time period, simulating their generation).
    for file in files:
        # 4.1. We copy the file from source_dir to dataset_dir#
        dbutils.fs.cp(source_dir + file, monitoring_dir + file)

        # 4.2. We increase the counter, as we have transferred a new file
        count = count + 1

        # 4.3. We wait the desired transfer_interval until next time slot.
        time.sleep((start + (count * time_step_interval)) - time.time())

    # 5. We wait a last time_step_interval
    time.sleep(time_step_interval)

#------------------------------------
# FUNCTION my_main
#------------------------------------ 
def my_main():
    # 0. We set the mode
    console_sink = True

    # 1. We set the paths to the folders
    source_dir = "/FileStore/tables/my_dataset/"
    monitoring_dir = "/FileStore/tables/my_monitoring/"
    checkpoint_dir = "/FileStore/tables/my_checkpoint/"
    result_dir = "/FileStore/tables/my_result/"

    dbutils.fs.rm(monitoring_dir, True)
    dbutils.fs.rm(result_dir, True)
    dbutils.fs.rm(checkpoint_dir, True)  

    dbutils.fs.mkdirs(monitoring_dir)
    dbutils.fs.mkdirs(result_dir)
    dbutils.fs.mkdirs(checkpoint_dir)    

    # 2. We configure the Spark Session
    spark = pyspark.sql.SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel('WARN')    

    # 3. Operation C1: We create an Unbounded DataFrame reading the new content copied to monitoring_dir
    inputUDF = spark.readStream.format("text")\
                               .load(monitoring_dir)

    myDSW = None
    # 4. Operation A1: We create the DataStreamWritter...

    # 4.1. To either save to result_dir in append mode  
    if console_sink == False:
        myDSW = inputUDF.writeStream.format("text")\
                                    .option("path", result_dir) \
                                    .option("checkpointLocation", checkpoint_dir)\
                                    .trigger(processingTime="10 seconds")\
                                    .outputMode("append")   
    # 4.2. Or to display by console in append mode    
    else:
        myDSW = inputUDF.writeStream.format("console")\
                                    .trigger(processingTime="10 seconds")\
                                    .outputMode("append")   

    # 5. We get the StreamingQuery object derived from starting the DataStreamWriter
    mySQ = myDSW.start()

    # 6. We simulate the streaming arrival of files (i.e., one by one) from source_dir to monitoring_dir
    streaming_simulation(source_dir, monitoring_dir, 10)

    # 7. We stop the StreamingQuery to finish the application
    mySQ.stop()    

#-------------------------------
# MAIN ENTRY POINT
#-------------------------------strong text
if __name__ == '__main__':
    my_main()


我的数据集:f1.txt

第一句话.

第二句话.

第三句话.

第四句话.

第五句话.

第六句话.

推荐答案

"使用Databricks时,如何使该程序输出到控制台接收器并显示结果?"

"How can I make this program to output to Console sink and display the results when using Databricks?"

最简单的方法是使用Databricks提供的 display .您可以按如下所示使用它:

The easiest way is to use display which Databricks provides. You can use it as shown below:

# Cell 1
rateDf = (spark.readStream
  .format("rate")
  .option("rowsPerSecond", 1)
  .option("numPartitions", 1)
  .load())

# Cell 2
display(rateDf, streamName="rate_stream")

控制台接收器在Databricks中不起作用,因为您希望它在IDE中或将其提交到群集时可以使用.相反,您可以使用 memory 格式,并使用%sql 查询来查询数据:

The Console sink does not work in Databricks as you would expect it to work in your IDE or when submitting it to your cluster. Instead, you can use the memory format and query the data with an %sql query:

inputUDF.writeStream \
  .format("memory") \
  .trigger(processingTime = "10 seconds") \
  .queryName("inputUDF_console") \
  .outputMode("append") \
  .start()  

在另一个Databricks单元格中,您可以通过查询 queryName 中给出的表来查询数据:

In another Databricks Cell you can look into the data by querying the table as given in the queryName:

%sql select * from inputUDF_console

这篇关于如何以控制台格式打印结构化流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆