电光为什么在三角洲湖表中写空 [英] Why Spark writes Null in DeltaLake Table

查看:40
本文介绍了电光为什么在三角洲湖表中写空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的Java应用程序使用连接到套接字服务器Spark Structured Streaming不断获取封装在RDMessage对象中的传感器测量记录(IoT),该对象记录协议中用于控制的消息类型。 当消息到达时,将使用Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class)检查它们并将其转换为数据集。

虽然流被正确读取并且RDMeasurement对象被正确创建,但是输出流被设置为无或零,具体取决于数据类型。当我更改格式(.format("console"))时,我在DeltaFrame表或控制台中看到这一点。

我这里错过了什么?出什么问题了?

请参阅下面最重要的Java代码段

public final class SocketRDMeasurement {

    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
                .builder()
                .appName("SSSocketRDMeasurement")
                .master("local[*]")
                .getOrCreate();

        Encoder<StringArray> stringArrayEncoder = Encoders.bean(StringArray.class);
        Encoder<RDMessage> messageEncoder = Encoders.bean(RDMessage.class);
        Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class);

        Dataset<Row> records = spark
                .readStream()
                .format("socket")
                .option("host", host)
                .option("port", port)
                .load();

        Dataset<String> inputReceived = records.as(Encoders.STRING());

        Dataset<StringArray> input = inputReceived.as(Encoders.STRING())
                .map((MapFunction<String, StringArray>) x ->
                                new StringArray(x),
                        stringArrayEncoder);

        Dataset<RDMessage> messages = input.map(
                (MapFunction<StringArray, RDMessage>) 
                    r -> new RDMessage(r), messageEncoder);

        Dataset<RDMeasurement> measurements = messages
                .map((MapFunction<RDMessage, RDMeasurement>) r ->
                        new RDMeasurement(), measurementEncoder);

        // The code executes without warning or error but despite the 
        // objects being created correctly the output of dataset is
        // is saved with nulls/nan
        StreamingQuery query = measurements.writeStream()
                .outputMode("append")
                .format("delta")
                .option("checkpointLocation",
                    "/opt/data/delta/_checkpoints/ss-socket-rd-measurement")
                .start("/opt/data/delta/ss-socket-rd-measurement");

        query.awaitTermination();
    }
}

public class StringArray implements Serializable {
    private String[] tokens;
    public StringArray(String tokens) {
        this.tokens = tokens.split(",");
    }

    // getters, setters and toString goes here
}

public class RDMeasurement implements Serializable {
    private String dataSourceName = null;
    private double dt = 0.0f;
    private double t0 = 0f;
    private double endTimestamp = 0L;
    private double[] valuesArray;

    public RDMeasurement() { }

    public RDMeasurement(String dataSourceName, double t0, 
        double dt, double endTimestamp, double[] valuesArray) {

        this.dataSourceName = dataSourceName;
        this.t0 = t0;
        this.dt = dt;
        this.endTimestamp = endTimestamp;
        this.valuesArray = valuesArray;
    }

    // getters, setters and toString goes here
}

public class RDMessage implements Serializable {
    String type;
    RDMeasurement rdMeasurement;

    public RDMessage(String type, RDMeasurement rdMeasurement) {
        this.type = type;
        this.rdMeasurement = rdMeasurement;
    }

    public RDMessage(StringArray stringArray) {
        this(stringArray.getTokens()[0] ,
                new RDMeasurement(stringArray.getTokens()[1],
                        Double.parseDouble(stringArray.getTokens()[2]),
                        Double.parseDouble(stringArray.getTokens()[3]),
                        Double.parseDouble(stringArray.getTokens()[4]),
                        toDoubleArray(5, stringArray))
        );
    }

    private static double[] toDoubleArray(int skip, StringArray stringArray) {
        double[] ret = new double[stringArray.getTokens().length - 5];
        for (int i = 0; i < stringArray.getTokens().length - 5; i++) {
            ret[i] = Double.parseDouble(stringArray.getTokens()[i+skip]);
        }
        return ret;
    }

    // getters, setters and toString goes here
}

每行输入遵循以下格式:

V1_start_rd_0,ds_1,1642442598.266,1.0,1642442618.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_1,ds_2,1642442619.266,1.0,1642442639.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_2,ds_3,1642442640.266,1.0,1642442660.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00

推荐答案

重构Java代码并添加调试代码段后,我能够识别错误。

参见重构:

StreamingQuery query = dataStreamReader.load()
        .as(Encoders.STRING())
        .map((MapFunction<String, StringArray>) x -> new StringArray(x),
                stringArrayEncoder)
        .map((MapFunction<StringArray, RDMessage>)
                r -> new RDMessage(r), messageEncoder)
        .map((MapFunction<RDMessage, RDMeasurement>) e ->
                e.getRdMeasurement(), measurementEncoder)
        /*
        .map((MapFunction<RDMeasurement, String>) e -> {
            if (e.getDataSourceName() != null) {
                System.out.println("•••> " + e);
            }
            return e.toString();
        }, Encoders.STRING())
        .map((MapFunction<String, RDMeasurement>) s -> new RDMeasurement(s),
                measurementEncoder) 
        */
        .writeStream()
        .outputMode("append")
        .format("console")
        .start();
query.awaitTermination();

上面注释的代码使我能够识别问题。

这篇关于电光为什么在三角洲湖表中写空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆