反序列化来自 Kafka 主题的 Spark 结构化流数据 [英] Deserializing Spark structured stream data from Kafka topic

查看:36
本文介绍了反序列化来自 Kafka 主题的 Spark 结构化流数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Kafka 2.3.0 和 Spark 2.3.4.我已经构建了一个 Kafka 连接器,它读取 CSV 文件并将一行从 CSV 发布到相关的 Kafka 主题.该行是这样的:"201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect".CSV 包含 1000 条这样的行.连接器能够成功地将它们发布到主题上,我也能够在 Spark 中获取消息.我不确定如何将该消息反序列化为我的架构?请注意,消息是无头的,因此 kafka 消息中的关键部分为空.值部分包括上述 完整 CSV 字符串.我的代码如下.

I am working off Kafka 2.3.0 and Spark 2.3.4. I have already built a Kafka Connector which reads off a CSV file and posts a line from the CSV to the relevant Kafka topic. The line is like so: "201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect". The CSV contains 1000s of such lines. The Connector is able to successfully post them on the topic and I am also able to get the message in Spark. I am not sure how can I deserialize that message to my schema? Note that the messages are headerless so the key part in the kafka message is null. The value part includes the complete CSV string as above. My code is below.

我看了这个 - 如何使用 Java 中的结构化流从 Kafka 反序列化记录? 但无法将其移植到我的 csv 案例中.此外,我尝试了其他 spark sql 机制来尝试从值"列中检索单个行,但无济于事.如果我确实设法获得了编译版本(例如,indivValues 数据集或 dsRawData 上的地图),我会收到类似于以下内容的错误:org.apache.spark.sql.AnalysisException: cannot resolve 'IC' given输入列:[值];".如果我理解正确,那是因为 value 是一个逗号分隔的字符串,而如果我不做某事",spark 并不会真正为我神奇地映射它.

I looked at this - How to deserialize records from Kafka using Structured Streaming in Java? but was unable to port it to my csv case. In addition I've tried other spark sql mechanisms to try and retrieve the individual row from the 'value' column but to no avail. If I do manage to get a compiling version (e.g. a map over the indivValues Dataset or dsRawData) I get errors similar to: "org.apache.spark.sql.AnalysisException: cannot resolve 'IC' given input columns: [value];". If I understand correctly, it is because value is a comma separated string and spark isn't really going to magically map it for me without me doing 'something'.

//build the spark session
SparkSession sparkSession = SparkSession.builder()
    .appName(seCfg.arg0AppName)
    .config("spark.cassandra.connection.host",config.arg2CassandraIp)
    .getOrCreate();

...
//my target schema is this:
StructType schema = DataTypes.createStructType(new StructField[] {
    DataTypes.createStructField("timeOfOrigin",  DataTypes.TimestampType, true),
    DataTypes.createStructField("cName", DataTypes.StringType, true),
    DataTypes.createStructField("cRole", DataTypes.StringType, true),
    DataTypes.createStructField("bName", DataTypes.StringType, true),
    DataTypes.createStructField("stage", DataTypes.StringType, true),
    DataTypes.createStructField("intId", DataTypes.IntegerType, true),
    DataTypes.createStructField("intName", DataTypes.StringType, true),
    DataTypes.createStructField("intCatId", DataTypes.IntegerType, true),
    DataTypes.createStructField("catName", DataTypes.StringType, true),
    DataTypes.createStructField("are_vval", DataTypes.IntegerType, true),
    DataTypes.createStructField("isee_vval", DataTypes.IntegerType, true),
    DataTypes.createStructField("opCode", DataTypes.IntegerType, true),
    DataTypes.createStructField("opType", DataTypes.StringType, true),
    DataTypes.createStructField("opName", DataTypes.StringType, true)
    });
...

 Dataset<Row> dsRawData = sparkSession
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", config.arg3Kafkabootstrapurl)
    .option("subscribe", config.arg1TopicName)
    .option("failOnDataLoss", "false")
    .load();

//getting individual terms like '201310', 'XYZ001'.. from "values"
Dataset<String> indivValues = dsRawData
    .selectExpr("CAST(value AS STRING)")
    .as(Encoders.STRING())
    .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(",")).iterator(), Encoders.STRING());

//indivValues when printed to console looks like below which confirms that //I receive the data correctly and completely
/*
When printed on console, looks like this:
                +--------------------+
                |               value|
                +--------------------+
                |              201310|
                |              XYZ001|
                |                 Sup|
                |                 XYZ|
                |                   A|
                |                   0|
                |            Presales|
                |                   6|
                |             Callout|
                |                   0|
                |                   0|
                |                   1|
                |                   N|
                |            Prospect|
                +--------------------+
*/

StreamingQuery sq = indivValues.writeStream()
    .outputMode("append")
    .format("console")
    .start();
//await termination
sq.awaitTermination();

  • 我要求将数据输入为我上面显示的自定义模式,因为我将对其进行数学计算(对于每个新行和一些旧行的组合).
  • 在将 Kafka Connector 源任务中的标题推送到主题之前,是否更好地合成它们?使用标题是否会使此问题的解决更简单?
  • 谢谢!

    推荐答案

    我现在已经能够解决这个问题了.通过使用spark sql.解决方案的代码如下.

    I have been able to resolve this now. Via use of spark sql. The code to the solution is below.

    //dsRawData has raw incoming data from Kafka...
    Dataset<String> indivValues = dsRawData
                    .selectExpr("CAST(value AS STRING)")
                    .as(Encoders.STRING());
    
    //create new columns, parse out the orig message and fill column with the values
    Dataset<Row> dataAsSchema2 = indivValues
                        .selectExpr("value",
                                "split(value,',')[0] as time",
                                "split(value,',')[1] as cname",
                                "split(value,',')[2] as crole",
                                "split(value,',')[3] as bname",
                                "split(value,',')[4] as stage",
                                "split(value,',')[5] as intid",
                                "split(value,',')[6] as intname",
                                "split(value,',')[7] as intcatid",
                                "split(value,',')[8] as catname",
                                "split(value,',')[9] as are_vval",
                                "split(value,',')[10] as isee_vval",
                                "split(value,',')[11] as opcode",
                                "split(value,',')[12] as optype",
                                "split(value,',')[13] as opname")
                        .drop("value");
    
    //remove any whitespaces as they interfere with data type conversions
    dataAsSchema2 = dataAsSchema2
                        .withColumn("intid", functions.regexp_replace(functions.col("int_id"),
                                " ", ""))
                        .withColumn("intcatid", functions.regexp_replace(functions.col("intcatid"),
                                " ", ""))
                        .withColumn("are_vval", functions.regexp_replace(functions.col("are_vval"),
                                " ", ""))
                        .withColumn("isee_vval", functions.regexp_replace(functions.col("isee_vval"),
                                " ", ""))
                        .withColumn("opcode", functions.regexp_replace(functions.col("opcode"),
                                " ", ""));
    
        //change types to ready for calc
    dataAsSchema2 = dataAsSchema2
                        .withColumn("intcatid",functions.col("intcatid").cast(DataTypes.IntegerType))
                        .withColumn("intid",functions.col("intid").cast(DataTypes.IntegerType))
                        .withColumn("are_vval",functions.col("are_vval").cast(DataTypes.IntegerType))
                        .withColumn("isee_vval",functions.col("isee_vval").cast(DataTypes.IntegerType))
                        .withColumn("opcode",functions.col("opcode").cast(DataTypes.IntegerType));
    
    
    //build a POJO dataset    
    Encoder<Pojoclass2> encoder = Encoders.bean(Pojoclass2.class);
            Dataset<Pojoclass2> pjClass = new Dataset<Pojoclass2>(sparkSession, dataAsSchema2.logicalPlan(), encoder);
    

    这篇关于反序列化来自 Kafka 主题的 Spark 结构化流数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆