spark-streaming相关内容

嵌套json中的结构化流不同模式

嗨,我有一个场景,传入的消息是一个 Json,它有一个标题说 tablename,数据部分有表列数据.现在我想把它写到 parquet 到单独的文件夹说 /emp 和 /dept.我可以通过基于表名聚合行在常规流中实现这一点.但是在结构化流媒体中,我无法拆分它.我如何才能在结构化流媒体中实现这一点. {"tableName":"employee","data":{"empid":1","em ..

Spark Streaming - 4 核和 16 核的处理时间相同.为什么?

场景:我正在用 Spark Streaming 做一些测试.大约 100 条记录的文件每 25 秒出现一次. 问题:程序中使用 local[*] 的 4 核 pc 的处理平均需要 23 秒.当我将相同的应用程序部署到具有 16 核的服务器时,我期望处理时间有所改善.但是,我看到它仍然在 16 个内核中花费相同的时间(还检查了 ubuntu 中的 cpu 使用情况,并且 cpu 正在得到充分利 ..
发布时间:2021-11-14 23:07:52 其他开发

在不停止进程的情况下刷新 Spark 实时流中的数据帧

在我的应用程序中,我从 Kafka 队列获得了一个帐户流(使用 Spark 流和 kafka) 而且我需要从 S3 获取与这些帐户相关的属性,因此我计划缓存 S3 结果数据帧,因为 S3 数据目前至少不会更新一天,将来可能会更改为 1 小时或 10 分钟.所以问题是如何在不停止进程的情况下定期刷新缓存的数据帧. **更新:我计划在 S3 中有更新时将事件发布到 kafka,使用 SNS ..

使用 scalapb 在 Spark Streaming 中解码 Proto Buf 消息时出错

这是一个 Spark Streaming 应用程序,它使用以 Proto Buf 编码的 Kafka 消息.使用 scalapb 库.我收到以下错误.请帮忙. >com.google.protobuf.InvalidProtocolBufferException: 在解析一个>协议消息,输入在一个中间意外结束>场地.这可能意味着输入已被截断或>嵌入的消息误报了自己的长度.在>com.google ..

优雅地关闭火花结构化流

有一种方法可以通过将属性 spark.streaming.stopGracefullyOnShutdown 设置为 true,然后使用 kill -SIGTERM 命令终止进程来正常关闭 Spark 流.但是,我没有看到此类选项可用于结构化流 (SQLContext.scala). 结构化流媒体的关闭过程是否不同?或者只是还没有实施? 解决方案 此功能尚未实现.但是,spark 结构 ..

尝试在 Spark Streaming 中使用持久表时出现空指针异常

我在开始时创建了“gpsLookUpTable"并保留它,这样我就不需要一遍又一遍地拉它来做映射.但是,当我尝试在 foreach 内部访问它时,我得到了空指针异常.感谢任何帮助. 以下是代码片段: def main(args: Array[String]): Unit = {val conf = new SparkConf() ...val sc = 新的 SparkContext(co ..
发布时间:2021-11-14 22:54:29 其他开发

Spark读取大文件作为输入流

我知道 spark 内置方法可以分区和读取大块文件,并使用文本文件作为 rdd 分发.但是,我正在自定义的加密文件系统中阅读此内容,而 Spark 本质上不支持该文件系统.我能想到的一种方法是读取输入流并加载多行并分发给执行程序.继续阅读,直到所有文件都加载完毕.所以没有执行器会因为内存不足错误而崩溃.可以在 spark 中做到这一点吗? 解决方案 你可以针对不同的 n 尝试lines.t ..

控制 Apache Spark 中的数据分区

数据看起来像: 列 1 列 2 列 3 列 4 第 1 行 1 行 1 行 1 第2行 第2行 第2行 第2行 第3行 第3行 第3行 第3行 第4行 第4行 第4行 第4行 第5行 第5行 第5行 第5行 第6行第6行第6行第6行 问题:我想对这些数据进行分区,假设第 1 行和第 2 行将作为一个分区处理,第 3 行和第 4 行作为另一个处理,第 5 行和第 6 行作为另 ..
发布时间:2021-11-14 22:48:54 其他开发

集成 Spark SQL 和 Spark Streaming 时出现 Not Serializable 异常

这是我的源代码,其中我从服务器端获取一些数据,它不断生成数据流.然后对于每个 RDD ,我正在应用 SQL Schema,一旦创建了这个表,我就会尝试从这个 DStream 中选择一些东西. List男性 = 新 ArrayList();JavaDStreamdata = streamingContext.socketTextStream("localhost", (port));数据打印() ..
发布时间:2021-11-14 22:48:33 Java开发

如何将流式数据集转换为 DStream?

是否可以将流式o.a.s.sql.Dataset 转换为DStream?如果是这样,如何? 我知道如何将其转换为 RDD,但它是在流上下文中. 解决方案 这是不可能的.Structured Streaming 和传统 Spark Streaming (DStreams) 使用完全不同的语义并且彼此不兼容,因此: DStream 无法转换为 Streaming Dataset. ..

如何在启动 Spark Streaming 进程时加载历史数据,并计算运行聚合

我的 ElasticSearch 集群中有一些与销售相关的 JSON 数据,我想使用 Spark Streaming(使用 Spark 1.4.1)通过 Kafka 从我的电子商务网站动态聚合传入的销售事件,以获取当前视图用户的总销售额(就收入和产品而言). 从我阅读的文档中我不太清楚的是如何在 Spark 应用程序启动时从 ElasticSearch 加载历史数据,并计算例如每个用户的总收 ..