streaming相关内容

读取现有的 Avro 文件并发送到 Kafka

我有一个带有架构的现有 Avro 文件.我需要将文件发送给 Producer. 以下是我写的代码. 公共类ProducerDataSample {公共静态无效主(字符串 [] args){String topic = "我的主题";Schema.Parser 解析器 = 新的 Schema.Parser();架构模式 = parser.parse(AvroSchemaDefinitionL ..
发布时间:2021-11-12 02:44:08 Java开发

无法使用spark读取kafka主题数据

在我创建的名为 "sampleTopic" 的主题之一中,我有如下数据 sid,信徒 其中第一个参数是用户名,第二个参数是用户经常收听的歌曲名称.现在,我已经使用上述主题名称启动了 zookeeper、Kafka 服务器 和 producer.我已经使用 CMD 为该主题输入了上述数据.现在,我想在 spark 中读取主题执行一些聚合,并将其写回流.下面是我的代码: 包com.sparkK ..
发布时间:2021-11-12 02:43:15 其他开发

KSQL 跳跃窗口:仅访问最旧的子窗口

我正在使用如下所示的查询跟踪特定字段的滚动总和: SELECT id, SUM(quantity) AS 来自流的数量 \窗口跳跃(大小 1 分钟,提前 10 秒)\按 ID 分组; 现在,对于每个输入滴答声,它似乎会返回 6 个不同的聚合值,我猜它们是针对以下时间段的: [start, start+60] 秒[开始+10,开始+60]秒[开始+20,开始+60]秒[开始+30,开始+60] ..
发布时间:2021-11-12 02:39:01 其他开发

Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时引发错误

我目前正在 Dataproc 上运行 Spark 作业,并且在尝试重新加入组并从 kafka 主题读取数据时遇到错误.我已经做了一些挖掘,但不确定是什么问题.我将 auto.offset.reset 设置为 earliest 所以它应该从最早的可用未提交偏移量读取,最初我的火花日志看起来像这样: 19/04/29 16:30:30 信息org.apache.kafka.clients.consu ..

Spark中如何将JavaPairInputDStream转换为DataSet/DataFrame

我正在尝试从 kafka 接收流数据.在这个过程中,我能够接收流数据并将其存储到 JavaPairInputDStream 中.现在我需要分析这些数据而不将其存储到任何数据库中.所以我想将此 JavaPairInputDStream 转换为 DataSet 或 DataFrame 到目前为止我尝试的是: import java.util.Arrays;导入 java.util.Collec ..
发布时间:2021-11-12 02:10:03 Java开发

在 Kafka 流作业中进行同步数据库查询或宁静调用是一个好习惯吗?

我使用Kafka流处理实时数据,在Kafka流任务中,我需要访问MySQL查询数据,需要调用另一个restful服务. 所有操作都是同步的. 恐怕同步调用会降低流任务的处理能力. 这是一个好习惯吗?或者这样做有什么好主意吗? 解决方案 更好的方法是将 MySQL 表流式传输到 Kafka,并访问那里的数据.这具有将流应用程序与 MySQL 数据库分离的优势.如果您将来不再 ..
发布时间:2021-11-12 01:58:06 其他开发

在 flink 1.2 中下沉 kafka 流时出错

我所做的是从 kafka 以 json 格式读取消息.例如 {"a":1,"b":2} 然后我对这条消息应用了一个过滤器,确保a对应的值为1,b的值为2.最后,我想将结果流输出到下游的kafka.但是,我不知道为什么编译器会说类型不匹配. 我的代码如下: val kafkaConsumer = new FlinkKafkaConsumer010(params.getRequired(" ..
发布时间:2021-11-12 01:17:22 其他开发

Apache flink 广播状态被刷新

我使用广播模式连接两个流并从一个流读取到另一个流.代码看起来像这样 case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{覆盖 def processBroadcastElement(in2: (String, Double),上下文:BroadcastProcessFu ..
发布时间:2021-11-12 01:13:57 其他开发

在 flink 应用程序中处理流运算符期间加入 ioThread 时中断/错误

我有一个基于 flink 的流应用程序,它使用 apache kafka 源和接收器.由于某些天我在开发过程中随机出现异常,我不知道它们来自哪里. 我使用 mainRunner 类在 IntelliJ 中运行应用程序,并且我通过 kafka 向它提供消息.有时第一条消息会触发错误,有时它会在几条消息之后发生. 这是它的样子: 16:31:01.935 ERROR o.a.k.c.pr ..
发布时间:2021-11-12 01:12:20 其他开发

当检查点恢复时,flink kafkaproducer 以完全一次模式发送重复消息

我正在写一个案例来测试 flink 两步提交,下面是概述. sink kafka 恰好是一次 kafka 生产者.sink step 是mysql sink 扩展两步提交.sink compare是mysql sink扩展两步提交,这个sink偶尔会抛出一个异常来模拟checkpoint failed. 当检查点失败并恢复时,我发现 mysql 两步提交可以正常工作,但是 kafka ..
发布时间:2021-11-12 01:05:37 其他开发

如何在 Apache Flink 中连接两个流

例如我想将 1, 2, 3 和 4, 5 的流组合成一个,所以结果应该是: 1, 2, 3, 4, 5.换句话说:如果第一个源用尽 - 从第二个获取元素.我最接近的尝试是: val a = streamEnvironment.fromElements(1, 2, 3)val b = streamEnvironment.fromElements(4, 5)val c = a.union(b)c.m ..
发布时间:2021-11-12 00:57:51 其他开发

通过 RTMP/Rails 录制音频

我正在构建一个 rails/flex 应用程序,该应用程序需要录制音频,然后将其存储在我们的 amazon s3 帐户中.我发现除了使用某种形式的 RTMP 服务器通过闪存录制音频之外别无选择,但我们的托管环境不允许我们安装 FMS、Red5 等任何东西. 是否有任何现有的 Ruby/Rails RTMP 解决方案允许录音?如果没有,Rails 是否有可能至少拦截 RTMP 流,然后我可以希 ..
发布时间:2021-11-11 23:53:08 其他开发