avro相关内容
我们正在尝试使用 Confluent JDBC Sink 连接器将主题中的值写回 postgres 数据库. connector.class=io.confluent.connect.jdbc.JdbcSinkConnector连接.密码=xxx任务.max=1主题=主题名称自动进化=真connection.user=confluent_rw自动创建=真connection.url=jdbc:p
..
我有一个用例,我有一个 JSON,我想从 JSON 中生成模式和记录并发布记录.我已经配置了值序列化器并且架构设置向后兼容. 第一个 JSON 字符串 json = "{\n" + " \"id\": 1,\n" +" \"名称\": \"耳机\",\n" +" \"价格\": 1250.0,\n" +" \"标签\": [\"home\", \"green\"]\n" +"}\n
..
我的 Avro 架构中有一个状态枚举字段,其中当前可能的状态为 待定得到正式认可的拒绝了 我想在这个枚举“RESUBMIT"中再添加一个值.此更改是否向后兼容? 解决方案 不,不是.原因如下:如果您使用这个新模式来向 Kafka 发送事件,那么所有客户端都将尝试反序列化该值.当出现新类型的事件时,会有两种情况: 具有新架构的客户端将成功反序列化 具有旧架构的客户端将无法反序列化
..
我正在编写一个 Apache Flink 流应用程序,它对从 Kafka 总线读取的数据(Avro 格式)进行反序列化(有关 此处).数据被反序列化为 Scala 案例类.我在运行程序时遇到异常,它收到了来自 Kafka 的第一条消息 线程“main"中的异常 org.apache.flink.runtime.client.JobExecutionException: java.lang.Run
..
背景 我编写了几个小的 Kafka Connect 连接器.一个每秒生成随机数据,另一个将其记录在控制台中.它们与 Schema Registry 集成,因此数据是使用 Avro 进行序列化. 我使用 fast-data-dev Docker 镜像将它们部署到本地 Kafka 环境中Landoop 基本设置工作并每秒产生一条记录的消息 但是,我想更改 主题名称策略.默认生成
..
主要目标是聚合两个 Kafka 主题,一个是压缩的慢速移动数据,另一个是每秒接收到的快速移动数据. 我已经能够在诸如 KV (Long,String) 之类的简单场景中使用类似以下内容的消息: PCollection>输入 = p.apply(KafkaIO.读取().withKeyDeserializer(LongDeserializer.class).withValueDeserial
..
有没有办法使用模式来转换avro 来自 kafka 的消息,带有 spark 到 以及来自 SqlNetworkWordCount 示例 和 Kafka、Spark 和 Avro - 第 3 部分,生成和使用 Avro 消息以读取消息. 对象注入{val 解析器 = 新的 Schema.Parser()val schema = parser.parse(getClass.getResour
..
我的 KafkaProducer 能够使用 KafkaAvroSerializer 将对象序列化到我的主题.但是,KafkaConsumer.poll() 返回反序列化的 GenericRecord 而不是我的序列化类. MyKafkaProducer KafkaProducer生产者;尝试 (InputStream props = Resources.getResource("prod
..
我是 Kafka 和 Avro 的菜鸟.所以我一直试图让生产者/消费者运行.到目前为止,我已经能够使用以下方法生成和使用简单的字节和字符串:生产者的配置: Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.ap
..
我正在尝试使用 Avro 来读取/写入 Kafka 的消息.有没有人有使用 Avro 二进制编码器对将放入消息队列的数据进行编码/解码的示例? 与 Kafka 部分相比,我更需要 Avro 部分.或者,也许我应该看看不同的解决方案?基本上,我试图找到一个更有效的 JSON 空间解决方案.刚才提到了 Avro,因为它可以比 JSON 更紧凑. 解决方案 我终于想起去问 Kafka 邮件
..
我一直在尝试序列化 avro 通用记录并生成 avro 序列化数据以发送到 kafka.主要目标是不使用融合模式注册表来存储模式,而是将模式与序列化数据一起发送,以便可以从 kafka 主题中提取并反序列化. 以下是 AvroSerializer 用于生成 Avro 数据的部分. @覆盖公共字节[] 序列化(字符串主题,T 数据){尝试 {字节 [] 结果 = 空;如果(数据!= null
..
我在 python spark 应用程序中创建了一个 kafka 流,并且可以解析通过它出现的任何文本. kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) 我想将其更改为能够解析来自 kafka 主题的 avro 消息.从文件解析 avro 消息时,我这样做
..
我在 Spark 中有一个 Dataframe 看起来像 eventDF Sno|UserID|TypeExp1|JAS123|电影2|ASP123|游戏3|JAS123|服装4|DPS123|电影5|DPS123|服装6|ASP123|医疗7|JAS123|OTH8|POQ133|医疗......10000|DPS123|OTH 我需要将它以Avro格式写入Kafka主题目前我可以使
..
我需要使用 Confluent kafka-avro-serializer Maven 工件.从 official guide 我应该添加这个存储库到我的 Maven pom 汇合http://packages.confluent.io/maven/ 问题是 URL http://packages.confluent.io/ma
..
我有一个 spark 2.0 应用程序,它使用 spark 流(使用 spark-streaming-kafka-0-10_2.11)从 kafka 读取消息. 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它. 在常规流中,我使用 kafkaUtils 来 createDstrean,在我传递的参数中,它是值反序列化器. 在 Structured Streamin
..
当我尝试使用我各自的架构对数据运行 Kafka Consumer with Avro 时,它返回一个错误 "AvroRuntimeException: Malformed data. Length is negative: -40" .我看到其他人也有类似的问题 将字节数组转换为 json,Avro 写入和读取,以及 Kafka Avro Binary *coder.我还参考了这个 Consume
..
我正在尝试使用 PySpark 2.4.0 从 Kafka 读取 avro 消息. spark-avro 外部模块可以提供这个读取avro的方案文件: df = spark.read.format("avro").load("examples/src/main/resources/users.avro")df.select("name", "favorite_color").write.f
..
我从远程服务器 Kafka Avro 用 Python 接收消息(使用 Confluent Kafka Python 库的使用者),这些消息用带有用户代理、位置、url 等字段的 json 字典表示点击流数据.这是一条消息看起来像: b'\x01\x00\x00\xde\x9e\xa8\xd5\x8fW\xec\x9a\xa8\xd5\x8fW\x1axxx.xxx.xxx.xxx\x02:ht
..
我在 Spark Structured Streaming 中使用 Kafka Source 来接收 Confluent 编码的 Avro 记录.我打算使用 Confluent Schema Registry,但是与 Spark 结构化流的集成似乎是不可能的. 我见过这个问题,但无法在 Confluent Schema Registry 中使用.使用 Spark 2.0 从 Kafka 读取
..
在回顾示例时,我看到了很多这样的内容: FlinkKafkaConsumer08kafkaConsumer = new FlinkKafkaConsumer08("myavrotopic", avroSchema, properties); 我看到他们在这里已经知道架构. 在将 byte[] 读入通用记录之前,我不知道架构然后获取架构.(因为它可能会因记录而异) 有人可以将我指向从
..