streaming相关内容
DStream 可以有类型参数s吗? 如果是,如何? 当我在 myDStream: DStream[(A, B)](类参数)上尝试 lazy val qwe = mStream.mapWithState(stateSpec) 时,我得到: value mapWithState 不是 org.apache.spark.streaming.dstream.DStream[(A, B)]
..
我有一个带有架构的现有 Avro 文件.我需要将文件发送给 Producer. 以下是我写的代码. 公共类ProducerDataSample {公共静态无效主(字符串 [] args){String topic = "我的主题";Schema.Parser 解析器 = 新的 Schema.Parser();架构模式 = parser.parse(AvroSchemaDefinitionL
..
在我创建的名为 "sampleTopic" 的主题之一中,我有如下数据 sid,信徒 其中第一个参数是用户名,第二个参数是用户经常收听的歌曲名称.现在,我已经使用上述主题名称启动了 zookeeper、Kafka 服务器 和 producer.我已经使用 CMD 为该主题输入了上述数据.现在,我想在 spark 中读取主题执行一些聚合,并将其写回流.下面是我的代码: 包com.sparkK
..
我正在使用如下所示的查询跟踪特定字段的滚动总和: SELECT id, SUM(quantity) AS 来自流的数量 \窗口跳跃(大小 1 分钟,提前 10 秒)\按 ID 分组; 现在,对于每个输入滴答声,它似乎会返回 6 个不同的聚合值,我猜它们是针对以下时间段的: [start, start+60] 秒[开始+10,开始+60]秒[开始+20,开始+60]秒[开始+30,开始+60]
..
当我运行火花流示例 org.apache.spark.examples.streaming.JavaDirectKafkaWordCount 时,我发现了一个 EOFException 跟随,我该如何解决它 线程“main"org.apache.spark.SparkException 中的异常:java.io.EOFException:从通道读取时收到 -1,套接字可能已关闭.java.io.
..
我目前正在 Dataproc 上运行 Spark 作业,并且在尝试重新加入组并从 kafka 主题读取数据时遇到错误.我已经做了一些挖掘,但不确定是什么问题.我将 auto.offset.reset 设置为 earliest 所以它应该从最早的可用未提交偏移量读取,最初我的火花日志看起来像这样: 19/04/29 16:30:30 信息org.apache.kafka.clients.consu
..
我的目标是建立一个使用 Kafka 作为源的高吞吐量集群 &Flink 作为流处理引擎.这是我所做的. 我已经在主节点和工作节点上设置了一个 2 节点集群,以下配置. 掌握 flink-conf.yaml jobmanager.rpc.address: #本地主机jobmanager.rpc.port: 6123jobmanager.heap.mb: 256taskmanager.h
..
我正在尝试从 kafka 接收流数据.在这个过程中,我能够接收流数据并将其存储到 JavaPairInputDStream 中.现在我需要分析这些数据而不将其存储到任何数据库中.所以我想将此 JavaPairInputDStream 转换为 DataSet 或 DataFrame 到目前为止我尝试的是: import java.util.Arrays;导入 java.util.Collec
..
出于测试目的,我需要模拟客户端每秒生成 100,000 条消息并将它们发送到 kafka 主题.有什么工具或方法可以帮助我生成这些随机消息? 解决方案 有一个用于生成虚拟负载的内置工具,位于 bin/kafka-producer-perf-test.sh (https://github.com/apache/kafka/blob/trunk/bin/kafka-producer-perf-
..
我使用Kafka流处理实时数据,在Kafka流任务中,我需要访问MySQL查询数据,需要调用另一个restful服务. 所有操作都是同步的. 恐怕同步调用会降低流任务的处理能力. 这是一个好习惯吗?或者这样做有什么好主意吗? 解决方案 更好的方法是将 MySQL 表流式传输到 Kafka,并访问那里的数据.这具有将流应用程序与 MySQL 数据库分离的优势.如果您将来不再
..
我所做的是从 kafka 以 json 格式读取消息.例如 {"a":1,"b":2} 然后我对这条消息应用了一个过滤器,确保a对应的值为1,b的值为2.最后,我想将结果流输出到下游的kafka.但是,我不知道为什么编译器会说类型不匹配. 我的代码如下: val kafkaConsumer = new FlinkKafkaConsumer010(params.getRequired("
..
我的目标是使用 kafka 读取 json 格式的字符串,对字符串进行过滤,选择部分消息并将消息下沉(仍然是 json 字符串格式). 出于测试目的,我的输入字符串消息如下所示: {"a":1,"b":2,"c":"3"} 我的实现代码是: def main(args: Array[String]): Unit = {val inputProperties = new Properti
..
我使用广播模式连接两个流并从一个流读取到另一个流.代码看起来像这样 case class Broadcast extends BroadCastProcessFunction[MyObject,(String,Double), MyObject]{覆盖 def processBroadcastElement(in2: (String, Double),上下文:BroadcastProcessFu
..
我有一个基于 flink 的流应用程序,它使用 apache kafka 源和接收器.由于某些天我在开发过程中随机出现异常,我不知道它们来自哪里. 我使用 mainRunner 类在 IntelliJ 中运行应用程序,并且我通过 kafka 向它提供消息.有时第一条消息会触发错误,有时它会在几条消息之后发生. 这是它的样子: 16:31:01.935 ERROR o.a.k.c.pr
..
我正在写一个案例来测试 flink 两步提交,下面是概述. sink kafka 恰好是一次 kafka 生产者.sink step 是mysql sink 扩展两步提交.sink compare是mysql sink扩展两步提交,这个sink偶尔会抛出一个异常来模拟checkpoint failed. 当检查点失败并恢复时,我发现 mysql 两步提交可以正常工作,但是 kafka
..
我的目标是建立一个使用 Kafka 作为源的高吞吐量集群 &Flink 作为流处理引擎.这是我所做的. 我已经在主节点和工作节点上设置了一个 2 节点集群,以下配置. 掌握 flink-conf.yaml jobmanager.rpc.address: #本地主机jobmanager.rpc.port:6123jobmanager.heap.mb: 256taskmanager.he
..
我的目标是使用 kafka 读取 json 格式的字符串,对字符串进行过滤,然后将消息下沉(仍然是 json 字符串格式). 出于测试目的,我的输入字符串消息如下所示: {"a":1,"b":2} 我的实现代码是: def main(args: Array[String]): Unit = {//解析输入参数val params = ParameterTool.fromArgs(arg
..
例如我想将 1, 2, 3 和 4, 5 的流组合成一个,所以结果应该是: 1, 2, 3, 4, 5.换句话说:如果第一个源用尽 - 从第二个获取元素.我最接近的尝试是: val a = streamEnvironment.fromElements(1, 2, 3)val b = streamEnvironment.fromElements(4, 5)val c = a.union(b)c.m
..
我正在构建一个 rails/flex 应用程序,该应用程序需要录制音频,然后将其存储在我们的 amazon s3 帐户中.我发现除了使用某种形式的 RTMP 服务器通过闪存录制音频之外别无选择,但我们的托管环境不允许我们安装 FMS、Red5 等任何东西. 是否有任何现有的 Ruby/Rails RTMP 解决方案允许录音?如果没有,Rails 是否有可能至少拦截 RTMP 流,然后我可以希
..
我有一个无限制的 Kafka 流发送数据,其中包含以下字段 {"identifier": "xxx", "value": 10.0, "ts":"2019-01-16T10:51:26.326242+0000"} 我使用用于 kafka 的 apache beam sdk 读取流 import org.apache.beam.sdk.io.kafka.KafkaIO;pipeline.app
..