spark-streaming相关内容

如何控制从 Spark DataFrame 写入的输出文件的数量?

使用 Spark 流从 Kafka 主题读取 Json 数据. 我使用 DataFrame 处理数据,稍后我希望将输出保存到 HDFS 文件.问题是使用: df.write.save("append").format("text") 产生许多文件,有些文件很大,有些甚至是 0 字节. 有没有办法控制输出文件的数量?另外,为了避免“相反"问题,有没有办法同时限制每个文件的大小,以便在当前 ..

Stream-Static Join:如何定期刷新(非持久化/持久化)静态数据帧

我正在构建一个 Spark Structured Streaming 应用程序,我正在其中执行批处理流连接.并且批处理数据的来源会定期更新. 因此,我计划定期对该批数据进行持久化/非持久化. 以下是我用来持久化和取消持久化批处理数据的示例代码. 流程: 读取批处理数据 保留批处理数据 每隔一小时,取消持久化数据并读取批处理数据并再次持久化. 但是,我没有看到批处理 ..

如何在 Spark Streaming 应用程序中异步写入行以加快批处理执行速度?

我有一个 Spark 作业,我需要在每个微批次中编写 SQL 查询的输出.写入在性能上是一项代价高昂的操作,并且会导致批处理执行时间超过批处理间隔. 我正在寻找提高写入性能的方法. 在单独的线程中异步执行写入操作(如下所示)是一个不错的选择吗? 这是否会导致任何副作用,因为 Spark 本身以分布式方式执行? 是否有其他/更好的方法来加快写入速度? //创建固定线程池执行 ..

Spark - 流式数据帧/数据集不支持非基于时间的窗口;

我需要使用内部选择和分区来编写 Spark sql 查询.问题是我有 AnalysisException.我已经在这上面花了几个小时,但使用其他方法我没有成功. 异常: 线程“main"org.apache.spark.sql.AnalysisException 中的异常:流式数据帧/数据集不支持非基于时间的窗口;;窗口 [sum(cast(_w0#41 as bigint)) windo ..
发布时间:2021-11-14 21:57:27 Java开发

从缓存中删除火花数据帧

我将 Spark 1.3.0 与 python api 一起使用.在转换巨大的数据帧时,我缓存了许多 DF 以加快执行速度; df1.cache()df2.cache() 一旦某个数据帧的使用结束并且不再需要,我该如何从内存中删除 DF(或取消缓存它??)? 例如,df1 用于整个代码,而 df2 用于少数转换,之后就不再需要了.我想强行删除 df2 以释放更多内存空间. 解决方案 ..
发布时间:2021-11-14 21:50:00 其他开发

如何制作(Spark1.6)saveAsTextFile 来附加现有文件?

在 SparkSQL 中,我使用 DF.wirte.mode(SaveMode.Append).json(xxxx),但是这种方法获取这些文件 like 文件名太复杂随机,无法用api获取.所以想用saveAstextfile,因为文件名不复杂,不规则,但不知道如何在同一个目录中追加文件?感谢为您的时间. 解决方案 在 Spark 1.5 上工作,我认为这是正确的用法.. data ..

Spark SQL 删除空格

我有一个简单的 Spark 程序,它读取一个 JSON 文件并发出一个 CSV 文件.在 JSON 数据中,值包含前导和尾随空格,当我发出 CSV 时,前导和尾随空格消失了.有没有办法可以保留这些空间.我尝试了很多选项,例如 ignoreTrailingWhiteSpace 、 ignoreLeadingWhiteSpace 但没有运气 input.json {"key" : "k1", ..

仅当数据框中存在列时才应用条件

我在 java8 中使用 spark-sql-2.4.1v.我有一个场景,如果列出现在给定的数据框列列表中,我需要执行某些操作 我有如下示例数据框,数据框的列会根据在数据库表上执行的外部查询而有所不同. val data = List((“20"、“分数"、“学校"、“2018-03-31"、14、12、20),(“21"、“分数"、“学校"、“2018-03-31"、13、13、21), ..

加入查找数据集后进行多列值查找

我正在使用 spark-sql-2.4.1v 如何根据列的值进行各种连接我需要为给定的值列获取 map_val 列的多个查找值,如下所示. 示例数据: val data = List((“20"、“分数"、“学校"、“2018-03-31"、14、12),(“21"、“分数"、“学校"、“2018-03-31"、13、13),(“22"、“率"、“学校"、“2018-03-31"、11、1 ..
发布时间:2021-11-14 21:25:29 其他开发

Spark DataFrame:orderBy 之后的 groupBy 是否保持该顺序?

我有一个具有以下结构的 Spark 2.0 数据帧 example: id,小时,计数id1, 0, 12id1, 1, 55..id1, 23, 44id2, 0, 12id2, 1, 89..id2, 23, 34等等. 它包含每个 id 的 24 个条目(一天中的每个小时一个),并使用 orderBy 函数按 id、小时排序. 我创建了一个聚合器groupConcat: def ..

Spark Dataframe 验证 Parquet 写入的列名

我正在使用从 JSON 事件流转换而来的数据帧处理事件,最终以 Parquet 格式写出. 但是,一些 JSON 事件在键中包含空格,我想在将其转换为 Parquet 之前从数据框中记录和过滤/删除此类事件,因为 ;{}()\n\t= 被视为 Parquet 架构 (CatalystSchemaConverter) 中的特殊字符,如[1] 下面中所列,因此不应在列名中使用. 如何在 D ..

如何将每个 DStream 保存/插入到永久表中

关于将输出 Dstream 插入永久 SQL 表的“Spark Streaming"问题,我一直面临着问题.我想将每个输出 DStream(来自激发进程的单个批次)插入到一个唯一的表中.我一直在使用 Python 和 Spark 1.6.2 版. 在我代码的这一部分,我有一个由一个或多个 RDD 组成的 Dstream,我想将其永久插入/存储到 SQL 表中,而不会丢失每个处理批次的任何结果 ..

为什么 foreachRDD 不使用 StreamingContext.textFileStream 用新内容填充 DataFrame?

我的问题是,当我将代码更改为流模式并将数据框放入 foreach 循环时,数据框显示为空表!我不填!我也不能把它放到 assembler.transform() 中.错误是: Error:(38, 40) not enough arguments for method map: (mapFunc: String => U)(implicit evidence$2: scala.reflect.C ..