使用Python生成Spark结构化流 [英] Spark structured streaming with python

查看:164
本文介绍了使用Python生成Spark结构化流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Kafka和Python生成Spark结构化流. 要求:我需要在Spark(执行转换)中处理来自Kafka(JSON格式)的流数据,然后将其存储在数据库中.

I am trying to Spark structured streaming with Kafka and Python. Requirement: I need to process streaming data from Kafka (in JSON format) in Spark (perform transformations) and then store it in a database.

我有JSON格式的数据,例如, {"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}

I have data in JSON format like, {"a": 120.56, "b": 143.6865998138807, "name": "niks", "time": "2012-12-01 00:00:09"}

我打算使用spark.readStream来像从卡夫卡一样阅读

I am planning to use spark.readStream for reading from Kafka like,

data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()

我引用了此链接供参考,但未获得如何解析JSON数据的信息.我尝试过了,

I referred this link for reference but didn't get how to parse JSON data. I tried this,

data = data.selectExpr("CAST(a AS FLOAT)","CAST(b as FLOAT)", "CAST(name as STRING)", "CAST(time as STRING)").as[(Float, Float, String, String)]

但是看起来不起作用.

任何使用Python进行Spark结构化流处理的人都可以指导我继续进行示例示例或链接吗?

Can anyone who has worked on Spark structured streaming with Python guide me to proceed with sample examples or links?

使用

schema = StructType([
    StructField("a", DoubleType()),
    StructField("b", DoubleType()),
    StructField("name", StringType()),
    StructField("time", TimestampType())])

inData = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe","test").load()
data = inData.select(from_json(col("value").cast("string"), schema))
query = data.writeStream.outputMode("Append").format("console").start()

程序正在运行,但是我正在控制台上获得值,

Program runs but I am getting values on console as,

+-----------------------------------+
|jsontostruct(CAST(value AS STRING))|
+-----------------------------------+
|               [null,null,null,2...|
|               [null,null,null,2...|
+-----------------------------------+

17/04/07 19:23:15 INFO StreamExecution: Streaming query made progress: {
  "id" : "8e2355cb-0fd3-4233-89d8-34a855256b1e",
  "runId" : "9fc462e0-385a-4b05-97ed-8093dc6ef37b",
  "name" : null,
  "timestamp" : "2017-04-07T19:23:15.013Z",
  "numInputRows" : 2,
  "inputRowsPerSecond" : 125.0,
  "processedRowsPerSecond" : 12.269938650306749,
  "durationMs" : {
    "addBatch" : 112,
    "getBatch" : 8,
    "getOffset" : 2,
    "queryPlanning" : 4,
    "triggerExecution" : 163,
    "walCommit" : 26
  },
  "eventTime" : {
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[test]]",
    "startOffset" : {
      "test" : {
        "0" : 366
      }
    },
    "endOffset" : {
      "test" : {
        "0" : 368
      }
    },
    "numInputRows" : 2,
    "inputRowsPerSecond" : 125.0,
    "processedRowsPerSecond" : 12.269938650306749
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6aa91aa2"
  }
}

我在这里错过了什么吗?

Did I miss something here.

推荐答案

您可以将from_json与架构一起使用:

You can either use from_json with schema:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

schema = StructType([
    StructField("a", DoubleType()),
    StructField("b", DoubleType()), 
    StructField("name", StringType()), 
    StructField("time", TimestampType())])

data.select(from_json(col("value").cast("string"), schema))

或使用get_json_object将单个字段作为字符串获取:

or get individual fields as strings with get_json_object:

from pyspark.sql.functions import get_json_object

data.select([
    get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
    for c in ["a", "b", "name", "time"]])

cast稍后根据您的需要.

这篇关于使用Python生成Spark结构化流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆