spark-streaming相关内容
以下是我创建 spark kafka 流的 Scala 代码: val kafkaParams = Map[String, Object](“bootstrap.servers"->"server110:2181,server110:9092",“动物园管理员"->"server110:2181",“key.deserializer" ->classOf[StringDeserializer],
..
我在下面说明我的情况:10000 - 服务器正在发送 DF 大小数据.(每 5 秒就有 10,000 个输入到来) 如果对于任何服务器 DF 大小超过 70% 打印,将 ROM 大小增加 20%如果任何服务器使用的 DF 大小小于 30%,则将 ROM 大小减少 25%. 我提供了一个从 kafka 获取消息并与“%"匹配并执行 to.upper() 的代码.此代码仅用于参考我的 ka
..
我正在使用批处理流 (maxRatePerPartition 10.000) 从 Kafka 流式传输数据.所以在每个批次中,我处理 10.000 条 kafka 消息. 在这个批处理运行中,我通过从 rdd 创建一个数据帧来处理每条消息.处理后,我使用:dataFrame.write.mode(SaveMode.append) 将每个处理过的记录保存到同一个文件中.因此它将所有消息附加到同
..
我已经添加了 kafka 和 spark 流的 sbt 包,如下所示: "org.apache.spark" % "spark-streaming_2.10" % "1.6.1",“org.apache.spark"%“spark-streaming-kafka_2.10"%“1.6.1" 但是当我想使用 kafkadirect 流时..我无法访问它.. val 主题="CCN_TOPIC,
..
我在创建一个基本的 Spark 流应用程序时遇到了困难. 现在,我正在我的本地机器上尝试它. 我已经完成了以下设置. -设置 Zookeeper -设置Kafka(版本:kafka_2.10-0.9.0.1) -使用以下命令创建主题 kafka-topics.bat --create --zookeeper localhost:2181 --replicatio
..
(我对批处理火花知之甚少,但对火花流一无所知) 问题 我有一个 kafka 主题 Kafka[(A,B)->X] 其中 (A,B) 是键(A 和 B 是简单的数字类型),X 是消息类型,比较大(几 Mb).抛开输入失败的问题,数据是一个网格:对于A中的每一个a,都会有消息(a,b) 用于 B 中的所有 b.此外,b 是有序的,我认为我们可以假设一个 a 的所有消息都将按照 b 的顺
..
我目前尝试创建某种监控解决方案 - 一些数据被写入 kafka,我使用 Spark Streaming 读取这些数据并对其进行处理. 为了预处理机器学习和异常检测的数据,我想根据一些过滤器参数拆分流.到目前为止,我已经了解到 DStreams 本身不能分成几个流. 我主要面临的问题是许多算法(如 KMeans)只采用连续数据而不是离散数据,例如url 或其他一些字符串. 我的理想
..
我的 ElasticSearch 集群中有一些与销售相关的 JSON 数据,我想使用 Spark Streaming(使用 Spark 1.4.1)通过 Kafka 从我的电子商务网站动态聚合传入的销售事件,以获取当前视图用户的总销售额(就收入和产品而言). 从我阅读的文档中我不太清楚的是如何在 Spark 应用程序启动时从 ElasticSearch 加载历史数据,并计算例如每个用户的总收
..
我已经使用 createDirectStream 实现了 Spark Streaming.我的 Kafka 生产者每秒向具有两个分区的主题发送多条消息. 在 Spark 流媒体方面,我每秒读取一次 kafka 消息,然后我将它们按 5 秒的窗口大小和频率进行窗口化. Kafka 消息得到了正确处理,我看到了正确的计算和打印. 但在 Spark Web UI 中,在 Strea
..
尝试从 kafka 源中读取.我想从收到的消息中提取时间戳以进行结构化火花流.卡夫卡(版本 0.10.0.0)火花流(2.0.1版) 解决方案 spark.read.format("卡夫卡").option("kafka.bootstrap.servers", "your.server.com:9092").option("订阅", "你的主题").加载().select($"timest
..
我正在尝试使用 Spark Direct Stream 获取并存储 Kafka 中特定消息的偏移量.查看 Spark 文档很容易获取每个分区的范围偏移量,但我需要的是在完全扫描队列后存储主题的每条消息的起始偏移量. 解决方案 是的,你可以使用 MessageAndMetadata 版本的 createDirectStream 允许您访问消息元数据. 您可以在此处找到返回 tuple3
..
我是新的spark,请告诉我如何使用scala从apache spark中的kafka主题读取json数据. 谢谢. 解决方案 最简单的方法是利用 Spark 附带的 DataFrame 抽象. val sqlContext = new SQLContext(sc)val stream = KafkaUtils.createDirectStream[String, String,
..
我遇到了与 Kafka 相关的问题. 我的当前服务 (Producer) 将消息发送到 Kafka 主题 (events).该服务使用的是用 Java 编写的 kafka_2.12 v1.0.0. 我正在尝试将它与 spark-streaming 的示例项目集成为 Consumer 服务(此处 使用 kafka_2.11 v0.10.0,用 Scala 编写) 消息从Produc
..
这是一种奇怪的错误,因为我仍然将数据推送到 kafka 并使用来自 kafka 的消息和 线程“main"中的异常;java.lang.IllegalArgumentException:要求失败:numRecords 不能为负也有点奇怪.我搜索并没有得到任何相关的资源. 让我解释一下我的集群.我有 1 个服务器是主服务器,代理运行 mesos,我在上面设置了 3 个 kafka 代理.然后我
..
EDIT2:最后,我使用 Java 制作了自己的生产者,并且运行良好,所以问题出在 Kafka-console-producer.kafka-console-consumer 运行良好. 编辑:我已经尝试过 0.9.0.1 版本并且具有相同的行为. 我正在完成我的学士期末项目,即 Spark Streaming 和 Flink 之间的比较.在这两个框架之前,我使用 Kafka 和脚本来
..
从 spark 向 cassandra 写入数据时,没有写入数据. 闪回是: 我正在做 kafka-sparkStreaming-cassandra 集成. 我正在阅读 kafka 消息并尝试将其放入 cassandra 表 CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT). kafka 到 spark-streaming 运
..
我刚刚复制了spark streaming wodcount python代码,在Spark集群中使用spark-submit运行wordcount python代码,但是显示如下错误: py4j.protocol.Py4JJavaError:调用 o23.loadClass 时发生错误.:java.lang.ClassNotFoundException:org.apache.spark.str
..
我正在尝试在我的 Apache Spark 1.4.1 和 Kafka 0.9.0.0 之间启用 SSL,我正在使用 spark-streaming-kafka_2.10 Jar 连接到 Kafka,我正在使用 KafkaUtils.createDirectStream 方法从 Kafka 主题中读取数据. 最初,我遇到了 OOM 问题,我通过增加驱动程序内存解决了这个问题,之后我看到了下面
..
我在一个独立的集群中运行 Spark,其中 spark master、worker 和提交每个运行都在自己的 Docker 容器中. 当 spark-submit 我的 Java 应用程序带有 --repositories 和 --packages 选项时,我可以看到它成功下载了应用程序所需的依赖项.然而,stderr 日志在 spark workers 网络用户界面上报告了 java.la
..
我很困惑为什么我只能在 spark web UI 页面(8080)中看到一个 KafkaReceiver,但是我在Kafka中有10个分区,我在spark集群中使用了10个核心,我的代码在python中如下:kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer",{topic: 10})我想 KafkaRece
..