将 Spark Structured Streaming 与 Confluent Schema Registry 集成 [英] Integrating Spark Structured Streaming with the Confluent Schema Registry
问题描述
我在 Spark Structured Streaming 中使用 Kafka Source 来接收 Confluent 编码的 Avro 记录.我打算使用 Confluent Schema Registry,但是与 Spark 结构化流的集成似乎是不可能的.
I'm using a Kafka Source in Spark Structured Streaming to receive Confluent encoded Avro records. I intend to use Confluent Schema Registry, but the integration with spark structured streaming seems to be impossible.
我见过这个问题,但无法在 Confluent Schema Registry 中使用.使用 Spark 2.0 从 Kafka 读取 Avro 消息.2(结构化流媒体)
I have seen this question, but unable to get it working with the Confluent Schema Registry. Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)
推荐答案
免责声明
此代码仅在本地 master 上进行了测试,并且已报告在集群环境中遇到序列化程序问题.有一个 替代解决方案(步骤 7-9,使用步骤 10 中的 Scala 代码)将架构 ID 提取到列,查找每个唯一 ID,然后使用模式广播变量,在规模上效果更好.
Disclaimer
This code was only tested on a local master, and has been reported runs into serializer issues in a clustered environment. There's an alternative solution (step 7-9, with Scala code in step 10) that extracts out the schema ids to columns, looks up each unique ID, and then uses schema broadcast variables, which will work better, at scale.
此外,还有一个外部库AbsaOSS/ABRiS
使用 Spark 注册的地址
Also, there is an external library AbsaOSS/ABRiS
that also addresses using the Registry with Spark
由于 删除了最有用的其他答案,我想通过一些重构和评论重新添加它.
Since the other answer that was mostly useful was removed, I wanted to re-add it with some refactoring and comments.
这里是需要的依赖项.使用 Confluent 5.x 和 Spark 2.4 测试的代码
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
<exclusions>
<!-- Conflicts with Spark's version -->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
这里是 Scala 实现(仅在 master=local[*]
上进行了本地测试)
And here is the Scala implementation (only tested locally on master=local[*]
)
第一部分,定义导入、一些字段和一些获取模式的辅助方法
First section, define the imports, some fields, and a few helper methods to get schemas
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode
object App {
private var schemaRegistryClient: SchemaRegistryClient = _
private var kafkaAvroDeserializer: AvroDeserializer = _
def lookupTopicSchema(topic: String, isKey: Boolean = false) = {
schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
}
def avroSchemaToSparkSchema(avroSchema: String) = {
SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
}
// ... continues below
然后定义一个简单的 main 方法,解析 CMD args 以获取 Kafka 详细信息
Then define a simple main method that parses the CMD args to get Kafka details
def main(args: Array[String]): Unit = {
val cmd: CommandLine = parseArg(args)
val master = cmd.getOptionValue("master", "local[*]")
val spark = SparkSession.builder()
.appName(App.getClass.getName)
.master(master)
.getOrCreate()
val bootstrapServers = cmd.getOptionValue("bootstrap-server")
val topic = cmd.getOptionValue("topic")
val schemaRegistryUrl = cmd.getOptionValue("schema-registry")
consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)
spark.stop()
}
// ... still continues
然后是消费Kafka主题并反序列化的重要方法
Then, the important method that consumes the Kafka topic and deserializes it
private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
import spark.implicits._
// Setup the Avro deserialization UDF
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
kafkaAvroDeserializer.deserialize(bytes)
)
// Load the raw Kafka topic (byte stream)
val rawDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
// Deserialize byte stream into strings (Avro fields become JSON)
import org.apache.spark.sql.functions._
val jsonDf = rawDf.select(
// 'key.cast(DataTypes.StringType), // string keys are simplest to use
callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
callUDF("deserialize", 'value).as("value")
// excluding topic, partition, offset, timestamp, etc
)
// Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
val dfValueSchema = {
val rawSchema = lookupTopicSchema(topic)
avroSchemaToSparkSchema(rawSchema)
}
// Apply structured schema to JSON stream
val parsedDf = jsonDf.select(
'key, // keys are usually plain strings
// values are JSONified Avro records
from_json('value, dfValueSchema.dataType).alias("value")
).select(
'key,
$"value.*" // flatten out the value
)
// parsedDf.printSchema()
// Sample schema output
// root
// |-- key: string (nullable = true)
// |-- header: struct (nullable = true)
// | |-- time: long (nullable = true)
// | ...
// TODO: Do something interesting with this stream
parsedDf.writeStream
.format("console")
.outputMode(OutputMode.Append())
.option("truncate", false)
.start()
.awaitTermination()
}
// still continues
命令行解析器允许传入引导服务器、模式注册表、主题名称和 Spark 主服务器.
The command line parser allows for passing in bootstrap servers, schema registry, topic name, and Spark master.
private def parseArg(args: Array[String]): CommandLine = {
import org.apache.commons.cli._
val options = new Options
val masterOption = new Option("m", "master", true, "Spark master")
masterOption.setRequired(false)
options.addOption(masterOption)
val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
bootstrapOption.setRequired(true)
options.addOption(bootstrapOption)
val topicOption = new Option("t", "topic", true, "Kafka topic")
topicOption.setRequired(true)
options.addOption(topicOption)
val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
schemaRegOption.setRequired(true)
options.addOption(schemaRegOption)
val parser = new BasicParser
parser.parse(options, args)
}
// still continues
为了让上面的 UDF 正常工作,需要有一个反序列化器将字节的 DataFrame 转换为包含反序列化的 Avro 的数据帧
In order for the UDF above to work, then there needed to be a deserializer to take the DataFrame of bytes to one containing deserialized Avro
// Simple wrapper around Confluent deserializer
class AvroDeserializer extends AbstractKafkaAvroDeserializer {
def this(client: SchemaRegistryClient) {
this()
// TODO: configure the deserializer for authentication
this.schemaRegistry = client
}
override def deserialize(bytes: Array[Byte]): String = {
val value = super.deserialize(bytes)
value match {
case str: String =>
str
case _ =>
val genericRecord = value.asInstanceOf[GenericRecord]
genericRecord.toString
}
}
}
} // end 'object App'
将这些块中的每一个放在一起,在将 -b localhost:9092 -s http://localhost:8081 -t myTopic
添加到 Run Configurations > 后,它可以在 IntelliJ 中工作.程序参数
Put each of these blocks together, and it works in IntelliJ after adding -b localhost:9092 -s http://localhost:8081 -t myTopic
to Run Configurations > Program Arguments
这篇关于将 Spark Structured Streaming 与 Confluent Schema Registry 集成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!