avro相关内容
我有一个最终会有很多不同模式的主题.现在它只有一个.我已经通过 REST 创建了一个连接作业,如下所示: {"name":"com.mycompany.sinks.GcsSinkConnector-auth2",“配置":{"connector.class": "com.mycompany.sinks.GcsSinkConnector","topics": "auth.events","flus
..
我有一个具体的类,我在字节数组中对其进行序列化以发送到 Kafka 主题.对于序列化,我使用 ReflectDatumWriter .在发送字节 [] 之前,我在查看一些在线教程后将模式 ID 与模式 ID 放在前 4 个字节中. 我能够发送消息,但在 Avro 控制台消费者中使用它时,我得到的响应为: ./bin/kafka-avro-console-consumer --boots
..
我正在尝试使用 KafkaAvroSerialzer 设置一个 kafka 生产者以获取价值.每当 rit 尝试创建 Producer 时,我都会遇到此错误.我正在使用 confluent 5.2.1 中提供的所有 jar java.lang.NoClassDefFoundError: 无法初始化类 io.confluent.kafka.schemaregistry.client.rest.Re
..
我正在尝试将 avro 字节流反序列化为 Scala 案例类对象.基本上,我有一个带有 avro 编码数据流的 kafka 流,现在有一个对架构的补充,我正在尝试更新 Scala 案例类以包含新字段.案例类看起来像这样 /** Case 类来保存设备数据.*/案例类 DeviceData(deviceId: String,sw_version:字符串,时间戳:字符串,阅读:双,新字段:选项[字符
..
尝试使用 confluent_kafka.AvroConsumer 来消费来自给定时间戳的消息. if 标志:# 创建一个列表topic_partitions_to_search = 列表(map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1)))print("用 %s 搜索偏移量" % topic_
..
带有 avro 的 kafka 模式管理为我们提供了向后兼容性的灵活性,但我们如何处理方案中的重大更改? 假设生产者 A 向消费者 C 发布消息 M 假设消息 M 的方案发生了重大变化(例如,名称字段现在被拆分为 first_name 和 last_name)并且我们有了新的方案 M-New 现在我们正在部署生产者 A-New 和消费者 C-New 问题是,在我们的部署过程
..
我正在用 Java 编写一个 Kafka 流应用程序,它接受由连接器创建的输入主题,该连接器使用模式注册表和 avro 作为键和值转换器.连接器产生以下架构: key-schema: "int"价值模式:{"类型": "记录","name": "用户",“领域":[{"name": "firstname", "type": "string"},{"name": "lastname", "type
..
我设置了一个融合的 s3 接收器连接,它将 .avro 文件存储在 s3 中. 我转储这些文件,发现它们只是消息本身,我不知道在哪里可以找到消息密钥,知道吗? 配置如下: {"name": "s3-sink-test",“配置":{"connector.class": "io.confluent.connect.s3.S3SinkConnector","tasks.max": "1","
..
我正在用 Java 编写一个 Kafka 流应用程序,它接受由连接器创建的输入主题,该连接器使用模式注册表和 avro 作为键和值转换器.连接器产生以下架构: key-schema: "int"价值模式:{"类型": "记录","name": "用户",“领域":[{"name": "firstname", "type": "string"},{"name": "lastname", "type
..
我正在编写一个 REST 代理,比如融合的休息代理.它接受一个 JSON 负载、模式主题和 id,然后将 JSON 负载作为 Avro 对象写入流中.当我使用 kafka-avro-console-consumer 读取消息时,出现“未知幻字节"错误. 这是我的 kafka 生产者配置: properties.put("client.id", LocalHostUtils.getLoca
..
据我所知,我们可以在 Kafka 上定义 AVRO 模式,使用该模式定义的主题将只接受与该模式匹配的数据.在接受进入队列之前验证数据结构非常有用. Google Pub/Sub 中有没有类似的东西? 解决方案 Kafka 本身不验证模式,因此主题本身没有模式,除了一对字节数组和一些元数据.序列化程序是生产客户端的一部分,在数据到达主题之前执行验证.同样,在 PubSub 中,归根结底
..
我编写了一个小型 Java 程序,用于监视新文件的目录并将它们以二进制 Avro 格式发送到 Kafka 主题.我是 Avro 的新手,我是使用 Avro 文档和在线示例编写的.监控部分运行良好,但程序在运行时遇到 Avro 序列化失败.我收到此错误堆栈: 线程“main"中的异常 java.lang.ClassCastException: [B 不能转换为 java.nio.ByteBuffe
..
据我所知,我们可以在 Kafka 上定义 AVRO 模式,使用该模式定义的主题将只接受与该模式匹配的数据.在接受进入队列之前验证数据结构非常有用. Google Pub/Sub 中有没有类似的东西? 解决方案 Kafka 本身不验证模式,因此主题本身没有模式,除了一对字节数组和一些元数据.序列化程序是生产客户端的一部分,在数据到达主题之前执行验证.同样,在 PubSub 中,归根结底
..
当我想发送包含 long 类型字段的 AVRO 消息时,出现以下错误: Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 61引起:java.lang.ClassCastException:java.lang.Long 不能转换为 or
..
我有一个带有架构的现有 Avro 文件.我需要将文件发送给 Producer. 以下是我写的代码. 公共类ProducerDataSample {公共静态无效主(字符串 [] args){String topic = "我的主题";Schema.Parser 解析器 = 新的 Schema.Parser();架构模式 = parser.parse(AvroSchemaDefinitionL
..
我正在将来自 Kafka 的消息读入 Flink Shell (Scala),如下: scala>val stream = senv.addSource(new FlinkKafkaConsumer011[String]("topic", new SimpleStringSchema(), properties)).print()警告:有一个弃用警告;使用 -deprecation 重新运行以获
..
我正在 Scala 中构建一个 Apache Flink 应用程序,它从 Kafka 总线读取流数据,然后对其执行汇总操作.来自 Kafka 的数据是 Avro 格式,需要一个特殊的反序列化类.我找到了这个 Scala 类 AvroDeserializationScehema (http://codegists.com/snippet/scala/avrodeserializationschema
..
虽然为字段定义了默认值,kafka-avro-console-producer 完全忽略它: $ kafka-avro-console-producer --broker-list localhost:9092 --topic test-avro \--property schema.registry.url=http://localhost:8081 --property \value.sch
..
我有许多微服务在 Kafka 中读取/写入 Avro 消息. 架构很棒.阿罗很棒.但是真的需要架构注册表吗?它有助于集中架构,是的,但是微服务真的需要查询注册表吗?我不这么认为. 每个微服务都有一个架构副本,user.avsc,以及一个 Avro 生成的 POJO:User extends SpecificRecord.我想要每个 Schema 的 POJO,以便在代码中轻松操作.
..
我正在尝试将 kafka 中的 Avro messgaes 反序列化为从 Avro Schema 生成的 POJO.我正在使用 KafkaAvroDeserializer 进行此转换. 我能够在从 kafka 返回的 ConsumerRecord 记录中看到 GenericRecord.但是,当我尝试将此记录分配给生成的 POJO 类对象时,date 类型的 POJO 字段与 ClassCa
..