spark-streaming相关内容

spark流检查点恢复非常非常慢

目标:通过 Spark 流从 Kinesis 读取数据并将数据以 Parquet 格式存储到 S3. 情况:应用程序最初运行良好,运行批次为 1 小时,处理时间平均不到 30 分钟.出于某种原因,可以说应用程序崩溃了,我们尝试从检查点重新启动.处理现在需要永远并且不会继续前进.我们试图以 1 分钟的批处理间隔测试相同的东西,处理运行良好,批处理需要 1.2 分钟才能完成.当我们从检查点恢复时,每 ..
发布时间:2021-11-27 10:21:49 其他开发

Spark Streaming 1.6.0 中检查点/WAL 的可靠性问题

说明 我们在 Scala 中有一个 Spark Streaming 1.5.2 应用程序,它从 Kinesis Stream 读取 JSON 事件,执行一些转换/聚合并将结果写入不同的 S3 前缀.当前批处理间隔为 60 秒.我们有 3000-7000 个事件/秒.我们正在使用检查点来保护我们免于丢失聚合. 它运行良好有一段时间了,从异常中恢复,甚至集群重新启动.我们最近重新编译了 S ..
发布时间:2021-11-27 10:20:02 其他开发

使用 Spark DStream 作为 Akka 流的源的惯用方式

我正在构建一个 REST API,它在 Spark 集群中启动一些计算,并以分块的结果流进行响应.给定有计算结果的 Spark 流,我可以使用 dstream.foreachRDD() 将数据发送出 Spark.我正在使用 akka-http 发送分块的 HTTP 响应: val requestHandler: HttpRequest =>HttpResponse = {case HttpR ..
发布时间:2021-11-26 22:44:15 其他开发

pySpark Kafka Direct Streaming 更新 Zookeeper/Kafka Offset

目前我正在使用 Kafka/Zookeeper 和 pySpark (1.6.0).我已经成功创建了一个 kafka 消费者,它使用了 KafkaUtils.createDirectStream(). 所有流媒体都没有问题,但我认识到,在我消费了一些消息后,我的 Kafka 主题没有更新到当前偏移量. 因为我们需要更新主题以在此处进行监控,所以这有点奇怪. 在 Spark 的文档 ..
发布时间:2021-11-14 23:55:51 其他开发

Apache Zeppelin 0.6.1:运行 Spark 2.0 Twitter Stream 应用程序

我有一个安装了 Spark 2.0 和 Zeppelin 0.6.1 的集群.由于 TwitterUtils.scala 类已从 Spark 项目移至 Apache Bahir,我无法再在我的 Zeppelin 笔记本中使用 TwitterUtils. 这是我的笔记本的片段: 依赖加载: %depz.resetz.load("org.apache.bahir:spark-stream ..
发布时间:2021-11-14 23:52:38 其他开发

Apache Zeppelin &Spark Streaming:Twitter 示例仅适用于本地

我刚刚从 http://zeppelin-project.org/docs/tutorial/tutorial.html(“流数据教程"部分).我现在遇到的问题是该应用程序似乎只能在本地工作.如果我将 Spark 解释器设置“master"从“local[*]"更改为“spark://master:7077",当我执行相同的 SQL 语句时,应用程序将不再产生任何结果.我做错了什么吗?我已经重新启 ..
发布时间:2021-11-14 23:51:44 其他开发

如何在火花中处理这个

我使用的是 spark-sql 2.4.x 版本,Cassandra-3.x 版本使用的是 datastax-spark-cassandra-connector.与 kafka 一起. 我有一个来自 kafka 主题的财务数据的场景.data(基础数据集)包含 companyId, year , prev_year 字段信息. 如果列 year === prev_year 那么我需要加 ..
发布时间:2021-11-14 23:33:15 其他开发

SQLContext.gerorCreate 不是值

我收到错误 SQLContext.gerorCreate is not a value of object org.apache.spark.SQLContext.这是我的代码 import org.apache.spark.SparkConf导入 org.apache.spark.streaming.StreamingContext导入 org.apache.spark.streaming.S ..
发布时间:2021-11-14 23:32:16 其他开发

如何在 Scala 中使用动态键解析动态 Json

我正在尝试解析本质上是动态的 Json 结构并加载到数据库中.但是在 json 内部有动态键的情况下面临困难.下面是我的示例 json:尝试使用爆炸功能但没有帮助.这里描述了类似的东西 如何解析嵌套 JSON 结果中的动态 JSON 键? {“_id":{“planId":“5f34dab0c661d8337097afb9",“版本":{“$numberLong":“1"},“句号":{“姓名 ..
发布时间:2021-11-14 23:20:50 其他开发

将 Spark SQL 与 Spark Streaming 结合使用

尝试在 Spark 结构化流方面理解 SparkSql. Spark Session 从 kafka 主题读取事件,将数据聚合到按不同列名分组的计数并将其打印到控制台. 原始输入数据结构如下: +--------------+------------+----------+-----------+-------+-----------+--------------------+------- ..
发布时间:2021-11-14 23:13:49 其他开发

添加父列名称作为前缀以避免歧义

检查下面的代码.如果存在重复键,它将生成具有歧义的数据帧.我们应该如何修改代码以添加父列名称作为前缀. 添加了另一个包含 json 数据的列. scala>val df = 序列((77, "email1", """{"key1":38,"key3":39}""",""""{"name":";aaa","age":10}"""),(78, "email2", """{"key1":38," ..
发布时间:2021-11-14 23:11:39 其他开发

嵌套json中的结构化流不同模式

嗨,我有一个场景,传入的消息是一个 Json,它有一个标题说 tablename,数据部分有表列数据.现在我想把它写到 parquet 到单独的文件夹说 /emp 和 /dept.我可以通过基于表名聚合行在常规流中实现这一点.但是在结构化流媒体中,我无法拆分它.我如何才能在结构化流媒体中实现这一点. {"tableName":"employee","data":{"empid":1","em ..
发布时间:2021-11-14 23:11:06 其他开发

添加父列名称作为前缀以避免歧义

检查下面的代码.如果存在重复键,它将生成具有歧义的数据帧.我们应该如何修改代码以添加父列名称作为前缀. 添加了另一个包含 json 数据的列. scala>val df = 序列((77, "email1", """{"key1":38,"key3":39}""",""""{"name":";aaa","age":10}"""),(78, "email2", """{"key1":38," ..
发布时间:2021-11-14 23:09:58 其他开发

嵌套json中的结构化流不同模式

嗨,我有一个场景,传入的消息是一个 Json,它有一个标题说 tablename,数据部分有表列数据.现在我想把它写到 parquet 到单独的文件夹说 /emp 和 /dept.我可以通过基于表名聚合行在常规流中实现这一点.但是在结构化流媒体中,我无法拆分它.我如何才能在结构化流媒体中实现这一点. {"tableName":"employee","data":{"empid":1","em ..
发布时间:2021-11-14 23:09:37 其他开发

从 kafka-Spark-Streaming 读取数据时获取空集

嗨,我是 Spark Streaming 的新手.我正在尝试读取 xml 文件并将其发送到 kafka 主题.这是我的 Kafka 代码,它向 Kafka-console-consumer 发送数据. 代码: package org.apache.kafka.Kafka_Producer;导入 java.io.BufferedReader;导入 java.io.FileNotFoundEx ..
发布时间:2021-11-14 23:09:22 其他开发