如何在控制台中查看数据帧(相当于结构化流媒体的 .show())? [英] How to see the dataframe in the console (equivalent of .show() for structured streaming)?

查看:23
本文介绍了如何在控制台中查看数据帧(相当于结构化流媒体的 .show())?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试查看我的 DataFrame 中的内容..

I'm trying to see what's coming in as my DataFrame..

这是火花代码

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
import logging
import time

spark = SparkSession \
    .builder \
    .appName("Console Example") \
    .getOrCreate()

logging.info("started to listen to the host..")

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load()

data = lines.selectExpr("CAST(value AS STRING)")
query1 = data.writeStream.format("console").start()
time.sleep(10)
query1.awaitTermination()

我正在获取进度报告,但显然每个触发器的输入行都是 0..

I am getting the progress reports but obviously the input rows are 0 for each trigger..

2019-08-19 23:45:45 INFO  MicroBatchExecution:54 - Streaming query made progress: {
  "id" : "a4b26eaf-1032-4083-9e42-a9f2f0426eb7",
  "runId" : "35c2b82a-191d-4998-9c98-17b24f5e3e9d",
  "name" : null,
  "timestamp" : "2019-08-20T06:45:45.458Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 0
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "TextSocketSource[host: 127.0.0.1, port: 9999]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@5f3e6f3"
  }
}

我的 TCP 服务器正在吐出一些东西,我也可以在控制台中看到它 - 但我只想通过打印来确定我的 Spark 作业是否收到任何东西,但很难这样做.

My TCP server is spitting some stuff out and I can see it in the console too - but i just want to make sure if my spark job is receiving anything by printing out but difficult to do so.

这是我的 TCP 服务器代码.

This is my TCP server code.

import socket
import sys
import csv
import time


port = 9999
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('', port))
server_socket.listen(5)
connection_socket, addr = server_socket.accept()

file_path = "/Users/Downloads/youtube-new/USvideos.csv"
row_count = sum(1 for row in file_path)

with open(file_path, "r") as f:
    reader = csv.reader(f, delimiter="\t")
    while True:
        for i, line in enumerate(reader):
            try:
                print(line)
                data = line[0].encode('utf-8')
                connection_socket.send(data)
                time.sleep(2)
                if (row_count == i-1):
                    break
            except IndexError:
                print("Index error")
                server_socket.close()

server_socket.close()

我可以看到该行正在打印出来,所以我至少可以说这已经接受了 localhost:9999 的连接,它是主机 &我也用于火花作业的端口.

I can see the line is getting printed out so I can at least say that this has accepted connection at localhost:9999 which is the host & port I'm using for spark job as well.

这是data之一..

['8mhTWqWlQzU,17.15.11,"Wearing Online Dollar Store Makeup For A Week","Safiya Nygaard",22,2017-11-11T01:19:33.000Z,"wearing online dollar store makeup for a week"|"online dollar store makeup"|"dollar store makeup"|"daiso"|"shopmissa makeup"|"shopmissa haul"|"dollar store makeup haul"|"dollar store"|"shopmissa"|"foundation"|"concealer"|"eye primer"|"eyebrow pencil"|"eyeliner"|"bronzer"|"contour"|"face powder"|"lipstick"|"$1"|"$1 makeup"|"safiya makeup"|"safiya dollar store"|"safiya nygaard"|"safiya"|"safiya and tyler",2922523,119348,1161,6736,https://i.ytimg.com/vi/8mhTWqWlQzU/default.jpg,False,False,False,"I found this online dollar store called ShopMissA that sells all their makeup products for $1 and decided I had to try it out! So I replaced my entire everyday makeup routine with $1 makeup products, including foundation, concealer, eye primer, eyebrow pencil, eyeliner, bronzer, contour, face powder, and lipstick. What do you think? Would you try this?\\n\\nThis video is NOT sponsored!\\n\\nSafiya\'s Nextbeat: https://nextbeat.co/u/safiya\\nIG: https://www.instagram.com/safiyany/\\nTwitter: https://twitter.com/safiyajn\\nFacebook: https://www.facebook.com/safnygaard/\\n\\nAssistant Editor: Claire Wiley\\n\\nMUSIC\\nMind The Gap\\nvia Audio Network\\n\\nSFX\\nvia AudioBlocks"']

括号中的所有内容(注意我实际上是在发送 data[0])

Everything in the bracket (notice I'm actually sending data[0])

推荐答案

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
import logging
import time

spark = SparkSession \
    .builder \
    .appName("Console Example") \
    .getOrCreate()

logging.info("started to listen to the host..")

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load()

data = lines.selectExpr("CAST(value AS STRING)")
query1 = data.writeStream.queryName("counting").format("memory").outputMode("append").start()
for x in range(5):
  spark.sql("select * from counting").show()
  time.sleep(10)

试试这个,它会像spark Sql中的show()方法一样向你显示数据.它将向您显示 5 组数据,因为我们循环了 5 次.

Try this, it will show you data just as the method show() does in spark Sql. It will show you 5 sets of data, as we are looping five times.

这篇关于如何在控制台中查看数据帧(相当于结构化流媒体的 .show())?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆