如何在控制台中查看数据框(相当于.show()的结构化流)? [英] How to see the dataframe in the console (equivalent of .show() for structured streaming)?

查看:86
本文介绍了如何在控制台中查看数据框(相当于.show()的结构化流)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试查看DataFrame即将出现的情况.

I'm trying to see what's coming in as my DataFrame..

这是火花代码

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
import logging
import time

spark = SparkSession \
    .builder \
    .appName("Console Example") \
    .getOrCreate()

logging.info("started to listen to the host..")

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load()

data = lines.selectExpr("CAST(value AS STRING)")
query1 = data.writeStream.format("console").start()
time.sleep(10)
query1.awaitTermination()

我正在获取进度报告,但显然每个触发器的输入行均为0.

I am getting the progress reports but obviously the input rows are 0 for each trigger..

2019-08-19 23:45:45 INFO  MicroBatchExecution:54 - Streaming query made progress: {
  "id" : "a4b26eaf-1032-4083-9e42-a9f2f0426eb7",
  "runId" : "35c2b82a-191d-4998-9c98-17b24f5e3e9d",
  "name" : null,
  "timestamp" : "2019-08-20T06:45:45.458Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 0
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "TextSocketSource[host: 127.0.0.1, port: 9999]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@5f3e6f3"
  }
}

我的TCP服务器正在吐出一些东西,我也可以在控制台中看到它-但是我只想确定我的spark作业是否通过打印输出来接收到任何东西,但是很难做到.

My TCP server is spitting some stuff out and I can see it in the console too - but i just want to make sure if my spark job is receiving anything by printing out but difficult to do so.

这是我的TCP服务器代码.

This is my TCP server code.

import socket
import sys
import csv
import time


port = 9999
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('', port))
server_socket.listen(5)
connection_socket, addr = server_socket.accept()

file_path = "/Users/Downloads/youtube-new/USvideos.csv"
row_count = sum(1 for row in file_path)

with open(file_path, "r") as f:
    reader = csv.reader(f, delimiter="\t")
    while True:
        for i, line in enumerate(reader):
            try:
                print(line)
                data = line[0].encode('utf-8')
                connection_socket.send(data)
                time.sleep(2)
                if (row_count == i-1):
                    break
            except IndexError:
                print("Index error")
                server_socket.close()

server_socket.close()

我可以看到该行正在打印出来,所以我至少可以说这已经接受了主机名&本地主机9999上的连接.我也用于火花作业的端口.

I can see the line is getting printed out so I can at least say that this has accepted connection at localhost:9999 which is the host & port I'm using for spark job as well.

这是data之一.

['8mhTWqWlQzU,17.15.11,"Wearing Online Dollar Store Makeup For A Week","Safiya Nygaard",22,2017-11-11T01:19:33.000Z,"wearing online dollar store makeup for a week"|"online dollar store makeup"|"dollar store makeup"|"daiso"|"shopmissa makeup"|"shopmissa haul"|"dollar store makeup haul"|"dollar store"|"shopmissa"|"foundation"|"concealer"|"eye primer"|"eyebrow pencil"|"eyeliner"|"bronzer"|"contour"|"face powder"|"lipstick"|"$1"|"$1 makeup"|"safiya makeup"|"safiya dollar store"|"safiya nygaard"|"safiya"|"safiya and tyler",2922523,119348,1161,6736,https://i.ytimg.com/vi/8mhTWqWlQzU/default.jpg,False,False,False,"I found this online dollar store called ShopMissA that sells all their makeup products for $1 and decided I had to try it out! So I replaced my entire everyday makeup routine with $1 makeup products, including foundation, concealer, eye primer, eyebrow pencil, eyeliner, bronzer, contour, face powder, and lipstick. What do you think? Would you try this?\\n\\nThis video is NOT sponsored!\\n\\nSafiya\'s Nextbeat: https://nextbeat.co/u/safiya\\nIG: https://www.instagram.com/safiyany/\\nTwitter: https://twitter.com/safiyajn\\nFacebook: https://www.facebook.com/safnygaard/\\n\\nAssistant Editor: Claire Wiley\\n\\nMUSIC\\nMind The Gap\\nvia Audio Network\\n\\nSFX\\nvia AudioBlocks"']

括号中的所有内容(注意,我实际上是在发送data[0])

Everything in the bracket (notice I'm actually sending data[0])

推荐答案

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
import logging
import time

spark = SparkSession \
    .builder \
    .appName("Console Example") \
    .getOrCreate()

logging.info("started to listen to the host..")

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load()

data = lines.selectExpr("CAST(value AS STRING)")
query1 = data.writeStream.queryName("counting").format("memory").outputMode("append").start()
for x in range(5):
  spark.sql("select * from counting").show()
  time.sleep(10)

尝试一下,它将向您显示数据,就像方法show()在spark Sql中所做的一样.当我们循环五次时,它将向您显示5组数据.

Try this, it will show you data just as the method show() does in spark Sql. It will show you 5 sets of data, as we are looping five times.

这篇关于如何在控制台中查看数据框(相当于.show()的结构化流)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆