使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME [英] Spark Structured Streaming using sockets, set SCHEMA, Display DATAFRAME in console

查看:30
本文介绍了使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在 PySpark 中为流式 DataFrame 设置架构.

How can I set a schema for a streaming DataFrame in PySpark.

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *

spark = SparkSession\
    .builder\
    .appName("StructuredNetworkWordCount")\
    .getOrCreate()

# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
   .readStream\
   .format('socket')\
   .option('host', '192.168.0.113')\
   .option('port', 5560)\
   .load()

例如我需要一个像这样的表:

For example I need a table like :

Name,  lastName,   PhoneNumber    
Bob, Dylan, 123456    
Jack, Ma, 789456
....

如何将标题/模式设置为 ['Name','lastName','PhoneNumber']与他们的数据类型.

How can I set the header/schema to ['Name','lastName','PhoneNumber'] with their data types.

另外,是否可以连续显示此表,或者说 DataFrame 的前 20 行.当我尝试时,我收到错误

Also, Is it possible to display this table continuously, or say top 20 rows of the DataFrame. When I tried it I get the error

"pyspark.sql.utils.AnalysisException: '在流式数据帧/数据集上没有流式聚合时不支持完全输出模式;;\nProject"

"pyspark.sql.utils.AnalysisException: 'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nProject"

推荐答案

TextSocketSource 不提供任何集成的解析选项.只能使用以下两种格式之一:

TextSocketSource doesn't provide any integrated parsing options. It is only possible to use one of the two formats:

  • 时间戳和文本,如果 includeTimestamp 设置为 true 并具有以下架构:

  • timestamp and text if includeTimestamp is set to true with the following schema:

StructType([
    StructField("value", StringType()),
    StructField("timestamp", TimestampType())
])

  • text only if includeTimestamp 设置为 false 并且架构如下所示:

  • text only if includeTimestamp is set to false with the schema as shown below:

    StructType([StructField("value", StringType())]))
    

  • 如果您想更改此格式,您必须转换流以提取感兴趣的字段,例如使用正则表达式:

    If you want to change this format you'll have to transform the stream to extract fields of interest, for example with regular expressions:

    from pyspark.sql.functions import regexp_extract
    from functools import partial
    
    fields = partial(
        regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
    )
    
    lines.select(
        fields(idx=1).alias("name"),
        fields(idx=2).alias("last_name"), 
        fields(idx=3).alias("phone_number")
    )
    

    这篇关于使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    相关文章
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆