如何使用Java中的结构化流反序列化来自Kafka的记录? [英] How to deserialize records from Kafka using Structured Streaming in Java?

查看:146
本文介绍了如何使用Java中的结构化流反序列化来自Kafka的记录?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用Spark 2.1 .

我正在尝试使用Spark结构化流技术从Kafka中读取记录,对其进行反序列化,然后再应用聚合.

I am trying to read records from Kafka using Spark Structured Streaming, deserialize them and apply aggregations afterwards.

我有以下代码:

SparkSession spark = SparkSession
        .builder()
        .appName("Statistics")
        .getOrCreate();

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", kafkaUri)
        .option("subscribe", "Statistics")
        .option("startingOffsets", "earliest")
        .load();

df.selectExpr("CAST(value AS STRING)")

我想要的是将value字段反序列化到我的对象中,而不是强制转换为String.

What I want is to deserialize the value field into my object instead of casting as String.

我为此有一个自定义反序列化器.

I have a custom deserializer for this.

public StatisticsRecord deserialize(String s, byte[] bytes)

如何用Java做到这一点?

How can I do this in Java?

我找到的唯一相关链接是这个

The only relevant link I have found is this https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html, but this is for Scala.

推荐答案

为您的JSON消息定义架构.

Define schema for your JSON messages.

StructType schema = DataTypes.createStructType(new StructField[] { 
                DataTypes.createStructField("Id", DataTypes.IntegerType, false),
                DataTypes.createStructField("Name", DataTypes.StringType, false),
                DataTypes.createStructField("DOB", DataTypes.DateType, false) });

现在阅读如下消息. MessageData是用于JSON消息的JavaBean.

Now read Messages like below. MessageData is JavaBean for your JSON message.

Dataset<MessageData> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaUri)
            .option("subscribe", "Statistics")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING) as message")
            .select(functions.from_json(functions.col("message"),schema).as("json"))
            .select("json.*")
            .as(Encoders.bean(MessageData.class));  

这篇关于如何使用Java中的结构化流反序列化来自Kafka的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆