spark-streaming相关内容

将数据从 Spark-Streaming 存储到 Cassandra 时出现问题

SparkStreaming 上下文从 RabbitMQ 读取一个流,间隔为 30 秒.我想修改 cassandra 中存在的相应行的几列的值,然后想将数据存储回 Cassandra.为此,我需要检查特定主键的行是否存在于 Cassandra 中,如果是,获取它并执行必要的操作.但问题是,我在驱动程序上创建 StreamingContext 并在 Worker 上执行操作.因此,他们无法获得 St ..
发布时间:2021-12-21 22:25:48 其他开发

如何使用`ssc.fileStream()`读取镶木地板文件?传递给 `ssc.fileStream()` 的类型是什么?

我对Spark的fileStream()方法的理解是它以三种类型为参数:Key、Value和Format.对于文本文件,适当的类型是:LongWritable、Text 和 TextInputFormat. 首先,我想了解这些类型的本质.直觉上,我猜在这种情况下 Key 是文件的行号,而 Value 是该行上的文本.因此,在以下文本文件示例中: 你好测试另一个测试 DStream 的第一 ..
发布时间:2021-12-15 19:16:45 其他开发

spark流检查点恢复非常非常慢

目标:通过 Spark 流从 Kinesis 读取数据并将数据以 Parquet 格式存储到 S3. 情况:应用程序最初运行良好,运行批次为 1 小时,处理时间平均不到 30 分钟.出于某种原因,可以说应用程序崩溃了,我们尝试从检查点重新启动.处理现在需要永远并且不会继续前进.我们试图以 1 分钟的批处理间隔测试相同的东西,处理运行良好,批处理需要 1.2 分钟才能完成.当我们从检查点恢复时,每 ..

Spark Streaming 1.6.0 中检查点/WAL 的可靠性问题

说明 我们在 Scala 中有一个 Spark Streaming 1.5.2 应用程序,它从 Kinesis Stream 读取 JSON 事件,执行一些转换/聚合并将结果写入不同的 S3 前缀.当前批处理间隔为 60 秒.我们有 3000-7000 个事件/秒.我们正在使用检查点来保护我们免于丢失聚合. 它运行良好有一段时间了,从异常中恢复,甚至集群重新启动.我们最近重新编译了 S ..

Apache Zeppelin &Spark Streaming:Twitter 示例仅适用于本地

我刚刚从 http://zeppelin-project.org/docs/tutorial/tutorial.html(“流数据教程"部分).我现在遇到的问题是该应用程序似乎只能在本地工作.如果我将 Spark 解释器设置“master"从“local[*]"更改为“spark://master:7077",当我执行相同的 SQL 语句时,应用程序将不再产生任何结果.我做错了什么吗?我已经重新启 ..

如何在 Scala 中使用动态键解析动态 Json

我正在尝试解析本质上是动态的 Json 结构并加载到数据库中.但是在 json 内部有动态键的情况下面临困难.下面是我的示例 json:尝试使用爆炸功能但没有帮助.这里描述了类似的东西 如何解析嵌套 JSON 结果中的动态 JSON 键? {“_id":{“planId":“5f34dab0c661d8337097afb9",“版本":{“$numberLong":“1"},“句号":{“姓名 ..

嵌套json中的结构化流不同模式

嗨,我有一个场景,传入的消息是一个 Json,它有一个标题说 tablename,数据部分有表列数据.现在我想把它写到 parquet 到单独的文件夹说 /emp 和 /dept.我可以通过基于表名聚合行在常规流中实现这一点.但是在结构化流媒体中,我无法拆分它.我如何才能在结构化流媒体中实现这一点. {"tableName":"employee","data":{"empid":1","em ..