从Kafka主题反序列化Spark结构化流数据 [英] Deserializing Spark structured stream data from Kafka topic
问题描述
我正在使用Kafka 2.3.0和Spark 2.3.4.我已经建立了一个Kafka连接器,该连接器可以读取CSV文件,并将CSV中的一行内容发布到相关的Kafka主题.该行是这样的: "201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect". CSV包含1000条此类行.连接器可以成功地将它们发布到主题上,并且我还可以在Spark中获取消息.我不确定如何将该消息反序列化到我的模式?请注意,消息是无标题的,因此kafka消息中的关键部分为空.值部分包括上述的 complete CSV字符串.我的代码在下面.
I am working off Kafka 2.3.0 and Spark 2.3.4. I have already built a Kafka Connector which reads off a CSV file and posts a line from the CSV to the relevant Kafka topic. The line is like so: "201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect". The CSV contains 1000s of such lines. The Connector is able to successfully post them on the topic and I am also able to get the message in Spark. I am not sure how can I deserialize that message to my schema? Note that the messages are headerless so the key part in the kafka message is null. The value part includes the complete CSV string as above. My code is below.
我查看了此内容-如何在Java中使用结构化流从Kafka反序列化记录?,但无法将其移植到我的csv案例中.另外,我尝试了其他Spark sql机制来尝试从值"列中检索单个行,但无济于事.如果我确实设法获得了编译版本(例如,在indivValues数据集或dsRawData上的映射),则会收到类似以下错误:"org.apache.spark.sql.AnalysisException:给定输入列,无法解析'IC
':[value ];".如果我理解正确,那是因为value是一个用逗号分隔的字符串,而spark在没有我做某事"的情况下并不会真的为我神奇地映射它.
I looked at this - How to deserialize records from Kafka using Structured Streaming in Java? but was unable to port it to my csv case. In addition I've tried other spark sql mechanisms to try and retrieve the individual row from the 'value' column but to no avail. If I do manage to get a compiling version (e.g. a map over the indivValues Dataset or dsRawData) I get errors similar to: "org.apache.spark.sql.AnalysisException: cannot resolve 'IC
' given input columns: [value];". If I understand correctly, it is because value is a comma separated string and spark isn't really going to magically map it for me without me doing 'something'.
//build the spark session
SparkSession sparkSession = SparkSession.builder()
.appName(seCfg.arg0AppName)
.config("spark.cassandra.connection.host",config.arg2CassandraIp)
.getOrCreate();
...
//my target schema is this:
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("timeOfOrigin", DataTypes.TimestampType, true),
DataTypes.createStructField("cName", DataTypes.StringType, true),
DataTypes.createStructField("cRole", DataTypes.StringType, true),
DataTypes.createStructField("bName", DataTypes.StringType, true),
DataTypes.createStructField("stage", DataTypes.StringType, true),
DataTypes.createStructField("intId", DataTypes.IntegerType, true),
DataTypes.createStructField("intName", DataTypes.StringType, true),
DataTypes.createStructField("intCatId", DataTypes.IntegerType, true),
DataTypes.createStructField("catName", DataTypes.StringType, true),
DataTypes.createStructField("are_vval", DataTypes.IntegerType, true),
DataTypes.createStructField("isee_vval", DataTypes.IntegerType, true),
DataTypes.createStructField("opCode", DataTypes.IntegerType, true),
DataTypes.createStructField("opType", DataTypes.StringType, true),
DataTypes.createStructField("opName", DataTypes.StringType, true)
});
...
Dataset<Row> dsRawData = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.arg3Kafkabootstrapurl)
.option("subscribe", config.arg1TopicName)
.option("failOnDataLoss", "false")
.load();
//getting individual terms like '201310', 'XYZ001'.. from "values"
Dataset<String> indivValues = dsRawData
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(",")).iterator(), Encoders.STRING());
//indivValues when printed to console looks like below which confirms that //I receive the data correctly and completely
/*
When printed on console, looks like this:
+--------------------+
| value|
+--------------------+
| 201310|
| XYZ001|
| Sup|
| XYZ|
| A|
| 0|
| Presales|
| 6|
| Callout|
| 0|
| 0|
| 1|
| N|
| Prospect|
+--------------------+
*/
StreamingQuery sq = indivValues.writeStream()
.outputMode("append")
.format("console")
.start();
//await termination
sq.awaitTermination();
- 我需要按上面显示的自定义模式键入数据,因为我将在其上运行数学计算(对于每个新行和一些旧行).
- 在将标题推送到主题之前,最好在Kafka Connector源任务中合成标题吗?使用标头会更轻松地解决此问题吗?
谢谢!
推荐答案
鉴于您现有的代码,解析来自dsRawData
的输入的最简单方法是将其转换为Dataset<String>
,然后使用
Given your existing code, the easiest way to parse your input from your dsRawData
is to convert it to a Dataset<String>
and then use the native csv reader api
//dsRawData has raw incoming data from Kafka...
Dataset<String> indivValues = dsRawData
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
Dataset<Row> finalValues = sparkSession.read()
.schema(schema)
.option("delimiter",",")
.csv(indivValues);
使用这种结构,您可以使用与直接从Spark中读取CSV文件时可用的完全相同的CSV解析选项.
With such a construct you can use exactly the same CSV parsing options that are available when directly reading a CSV file from Spark.
这篇关于从Kafka主题反序列化Spark结构化流数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!