使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME [英] Spark Structured Streaming using sockets, set SCHEMA, Display DATAFRAME in console
问题描述
如何在 PySpark 中为流式 DataFrame
设置架构.
How can I set a schema for a streaming DataFrame
in PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Import data types
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.appName("StructuredNetworkWordCount")\
.getOrCreate()
# Create DataFrame representing the stream of input lines from connection to localhost:5560
lines = spark\
.readStream\
.format('socket')\
.option('host', '192.168.0.113')\
.option('port', 5560)\
.load()
例如我需要一个像这样的表:
For example I need a table like :
Name, lastName, PhoneNumber
Bob, Dylan, 123456
Jack, Ma, 789456
....
如何将标题/模式设置为 ['Name','lastName','PhoneNumber']与他们的数据类型.
How can I set the header/schema to ['Name','lastName','PhoneNumber'] with their data types.
另外,是否可以连续显示此表,或者说 DataFrame
的前 20 行.当我尝试时,我收到错误
Also, Is it possible to display this table continuously, or say top 20 rows of the DataFrame
. When I tried it I get the error
"pyspark.sql.utils.AnalysisException: '在流式数据帧/数据集上没有流式聚合时不支持完全输出模式;;\nProject"
"pyspark.sql.utils.AnalysisException: 'Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;\nProject"
推荐答案
TextSocketSource
不提供任何集成的解析选项.只能使用以下两种格式之一:
TextSocketSource
doesn't provide any integrated parsing options. It is only possible to use one of the two formats:
时间戳和文本,如果
includeTimestamp
设置为true
并具有以下架构:
timestamp and text if
includeTimestamp
is set totrue
with the following schema:
StructType([
StructField("value", StringType()),
StructField("timestamp", TimestampType())
])
text only if includeTimestamp
设置为 false
并且架构如下所示:
text only if includeTimestamp
is set to false
with the schema as shown below:
StructType([StructField("value", StringType())]))
如果您想更改此格式,您必须转换流以提取感兴趣的字段,例如使用正则表达式:
If you want to change this format you'll have to transform the stream to extract fields of interest, for example with regular expressions:
from pyspark.sql.functions import regexp_extract
from functools import partial
fields = partial(
regexp_extract, str="value", pattern="^(\w*)\s*,\s*(\w*)\s*,\s*([0-9]*)$"
)
lines.select(
fields(idx=1).alias("name"),
fields(idx=2).alias("last_name"),
fields(idx=3).alias("phone_number")
)
这篇关于使用套接字的 Spark Structured Streaming,设置 SCHEMA,在控制台中显示 DATAFRAME的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!