使用 Spark 结构化流读取 protobuf kafka 消息 [英] Read protobuf kafka message using spark structured streaming

本文介绍了使用 Spark 结构化流读取 protobuf kafka 消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以使用 Spark 结构化流从 kafka 读取 protobuf 消息?

Is it possible to read protobuf message from kafka using spark structured streaming?

推荐答案

方法一

sparkSession.udf().register("deserialize", getDeserializer(), schema);

    DataStreamReader dataStreamReader = sparkSession.readStream().format("kafka");

    for (Map.Entry<String, String> kafkaPropEntry : kafkaProps.entrySet()) {
        dataStreamReader.option(kafkaPropEntry.getKey(), kafkaPropEntry.getValue());
    }

    Dataset<Row> kafkaRecords =
            dataStreamReader.load()
                    .selectExpr("deserialize(value) as event").select("event.*");

方法 2

final StructType schema = getSchema();

    DataStreamReader dataStreamReader = sparkSession.readStream().format("kafka");

    for (Map.Entry<String, String> kafkaPropEntry : kafkaProps.entrySet()) {
        dataStreamReader.option(kafkaPropEntry.getKey(), kafkaPropEntry.getValue());
    }

    Dataset<Row> kafkaRecords = dataStreamReader.load()
            .map(row -> getOutputRow((byte[]) row.get(VALUE_INDEX)), RowEncoder.apply(schema))

方法 1 有一个缺陷,因为反序列化方法被多次调用(对于事件中的 evert 列)https://issues.apache.org/jira/browse/SPARK-17728.方法二直接使用map方法将protobuf映射到row.

Approach 1 has one flaw as deserialize method is called multiple times (for evert column in event) https://issues.apache.org/jira/browse/SPARK-17728. Approach 2 maps protobuf to row directly using map method.

这篇关于使用 Spark 结构化流读取 protobuf kafka 消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆