从Kafka主题反序列化Spark结构化流数据 [英] Deserializing Spark structured stream data from Kafka topic

查看:260
本文介绍了从Kafka主题反序列化Spark结构化流数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Kafka 2.3.0和Spark 2.3.4.我已经建立了一个Kafka连接器,该连接器可以读取CSV文件,并将CSV中的一行内容发布到相关的Kafka主题.该行是这样的: "201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect". CSV包含1000条此类行.连接器可以成功地将它们发布到主题上,并且我还可以在Spark中获取消息.我不确定如何将该消息反序列化到我的模式?请注意,消息是无标题的,因此kafka消息中的关键部分为空.值部分包括上述的 complete CSV字符串.我的代码在下面.

I am working off Kafka 2.3.0 and Spark 2.3.4. I have already built a Kafka Connector which reads off a CSV file and posts a line from the CSV to the relevant Kafka topic. The line is like so: "201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect". The CSV contains 1000s of such lines. The Connector is able to successfully post them on the topic and I am also able to get the message in Spark. I am not sure how can I deserialize that message to my schema? Note that the messages are headerless so the key part in the kafka message is null. The value part includes the complete CSV string as above. My code is below.

我查看了此内容-如何在Java中使用结构化流从Kafka反序列化记录?,但无法将其移植到我的csv案例中.另外,我尝试了其他Spark sql机制来尝试从值"列中检索单个行,但无济于事.如果我确实设法获得了编译版本(例如,在indivValues数据集或dsRawData上的映射),则会收到类似以下错误:"org.apache.spark.sql.AnalysisException:给定输入列,无法解析'IC':[value ];".如果我理解正确,那是因为value是一个用逗号分隔的字符串,而spark在没有我做某事"的情况下并不会真的为我神奇地映射它.

I looked at this - How to deserialize records from Kafka using Structured Streaming in Java? but was unable to port it to my csv case. In addition I've tried other spark sql mechanisms to try and retrieve the individual row from the 'value' column but to no avail. If I do manage to get a compiling version (e.g. a map over the indivValues Dataset or dsRawData) I get errors similar to: "org.apache.spark.sql.AnalysisException: cannot resolve 'IC' given input columns: [value];". If I understand correctly, it is because value is a comma separated string and spark isn't really going to magically map it for me without me doing 'something'.

//build the spark session
SparkSession sparkSession = SparkSession.builder()
    .appName(seCfg.arg0AppName)
    .config("spark.cassandra.connection.host",config.arg2CassandraIp)
    .getOrCreate();

...
//my target schema is this:
StructType schema = DataTypes.createStructType(new StructField[] {
    DataTypes.createStructField("timeOfOrigin",  DataTypes.TimestampType, true),
    DataTypes.createStructField("cName", DataTypes.StringType, true),
    DataTypes.createStructField("cRole", DataTypes.StringType, true),
    DataTypes.createStructField("bName", DataTypes.StringType, true),
    DataTypes.createStructField("stage", DataTypes.StringType, true),
    DataTypes.createStructField("intId", DataTypes.IntegerType, true),
    DataTypes.createStructField("intName", DataTypes.StringType, true),
    DataTypes.createStructField("intCatId", DataTypes.IntegerType, true),
    DataTypes.createStructField("catName", DataTypes.StringType, true),
    DataTypes.createStructField("are_vval", DataTypes.IntegerType, true),
    DataTypes.createStructField("isee_vval", DataTypes.IntegerType, true),
    DataTypes.createStructField("opCode", DataTypes.IntegerType, true),
    DataTypes.createStructField("opType", DataTypes.StringType, true),
    DataTypes.createStructField("opName", DataTypes.StringType, true)
    });
...

 Dataset<Row> dsRawData = sparkSession
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", config.arg3Kafkabootstrapurl)
    .option("subscribe", config.arg1TopicName)
    .option("failOnDataLoss", "false")
    .load();

//getting individual terms like '201310', 'XYZ001'.. from "values"
Dataset<String> indivValues = dsRawData
    .selectExpr("CAST(value AS STRING)")
    .as(Encoders.STRING())
    .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(",")).iterator(), Encoders.STRING());

//indivValues when printed to console looks like below which confirms that //I receive the data correctly and completely
/*
When printed on console, looks like this:
                +--------------------+
                |               value|
                +--------------------+
                |              201310|
                |              XYZ001|
                |                 Sup|
                |                 XYZ|
                |                   A|
                |                   0|
                |            Presales|
                |                   6|
                |             Callout|
                |                   0|
                |                   0|
                |                   1|
                |                   N|
                |            Prospect|
                +--------------------+
*/

StreamingQuery sq = indivValues.writeStream()
    .outputMode("append")
    .format("console")
    .start();
//await termination
sq.awaitTermination();

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆