spark-streaming相关内容
我使用的是 spark-sql 2.4.x 版本,Cassandra-3.x 版本使用的是 datastax-spark-cassandra-connector.与 kafka 一起. 我有货币样本的汇率元数据如下: val ratesMetaDataDf = Seq((“欧元"、“5/10/2019"、“1.130657"、“美元")、(“欧元"、“5/9/2019"、“1.13088"
..
使用 Spark 流从 Kafka 主题读取 Json 数据. 我使用 DataFrame 处理数据,稍后我希望将输出保存到 HDFS 文件.问题是使用: df.write.save("append").format("text") 产生许多文件,有些文件很大,有些甚至是 0 字节. 有没有办法控制输出文件的数量?另外,为了避免“相反"问题,有没有办法同时限制每个文件的大小,以便在当前
..
我正在构建一个 Spark Structured Streaming 应用程序,我正在其中执行批处理流连接.并且批处理数据的来源会定期更新. 因此,我计划定期对该批数据进行持久化/非持久化. 以下是我用来持久化和取消持久化批处理数据的示例代码. 流程: 读取批处理数据 保留批处理数据 每隔一小时,取消持久化数据并读取批处理数据并再次持久化. 但是,我没有看到批处理
..
我有一个 Spark 作业,我需要在每个微批次中编写 SQL 查询的输出.写入在性能上是一项代价高昂的操作,并且会导致批处理执行时间超过批处理间隔. 我正在寻找提高写入性能的方法. 在单独的线程中异步执行写入操作(如下所示)是一个不错的选择吗? 这是否会导致任何副作用,因为 Spark 本身以分布式方式执行? 是否有其他/更好的方法来加快写入速度? //创建固定线程池执行
..
我想比较两个连续的行 i 与 col2 的 i-1(按 col1 排序>). 如果 i-th 行的 item_i 和 item_[i-1]_row 不同,我想递增item_[i-1] 的计数加 1. +--------------+|col1 col2 |+--------------+|row_1 item_1 ||row_2 item_1 ||row_3 item_2 ||row_4
..
我需要使用内部选择和分区来编写 Spark sql 查询.问题是我有 AnalysisException.我已经在这上面花了几个小时,但使用其他方法我没有成功. 异常: 线程“main"org.apache.spark.sql.AnalysisException 中的异常:流式数据帧/数据集不支持非基于时间的窗口;;窗口 [sum(cast(_w0#41 as bigint)) windo
..
带有 YARN 的 Spark 作业中 spark.yarn.executor.memoryOverhead 的值应该分配给 App 还是只是最大值? 解决方案 spark.yarn.executor.memoryOverhead 只是最大值.目标是将 OVERHEAD 计算为实际执行器内存的百分比,如 RDD 和 DataFrames 使用的那样 --executor-memory/
..
我将 Spark 1.3.0 与 python api 一起使用.在转换巨大的数据帧时,我缓存了许多 DF 以加快执行速度; df1.cache()df2.cache() 一旦某个数据帧的使用结束并且不再需要,我该如何从内存中删除 DF(或取消缓存它??)? 例如,df1 用于整个代码,而 df2 用于少数转换,之后就不再需要了.我想强行删除 df2 以释放更多内存空间. 解决方案
..
在 SparkSQL 中,我使用 DF.wirte.mode(SaveMode.Append).json(xxxx),但是这种方法获取这些文件 like 文件名太复杂随机,无法用api获取.所以想用saveAstextfile,因为文件名不复杂,不规则,但不知道如何在同一个目录中追加文件?感谢为您的时间. 解决方案 在 Spark 1.5 上工作,我认为这是正确的用法.. data
..
我有一个简单的 Spark 程序,它读取一个 JSON 文件并发出一个 CSV 文件.在 JSON 数据中,值包含前导和尾随空格,当我发出 CSV 时,前导和尾随空格消失了.有没有办法可以保留这些空间.我尝试了很多选项,例如 ignoreTrailingWhiteSpace 、 ignoreLeadingWhiteSpace 但没有运气 input.json {"key" : "k1",
..
我正在尝试使用 SparkStreaming 将流数据存储到 HDFS 中,但它不断在新文件中创建,而不是附加到一个文件或几个文件中 如果一直创建n个文件,我觉得效率不会很高 HDFS 文件系统 代码 lines.foreachRDD(f => {如果 (!f.isEmpty()) {val df = f.toDF().coalesce(1)df.write.mode(Save
..
我在 java8 中使用 spark-sql-2.4.1v.我有一个场景,如果列出现在给定的数据框列列表中,我需要执行某些操作 我有如下示例数据框,数据框的列会根据在数据库表上执行的外部查询而有所不同. val data = List((“20"、“分数"、“学校"、“2018-03-31"、14、12、20),(“21"、“分数"、“学校"、“2018-03-31"、13、13、21),
..
我正在尝试在 spark 中读取来自 kafka(版本 10)的消息并尝试打印它. 导入 spark.implicits._val spark = SparkSession.builder.appName("StructuredNetworkWordCount").config("spark.master", "本地").getOrCreate()val ds1 = spark.readStr
..
我正在使用 spark-sql-2.4.1v 如何根据列的值进行各种连接我需要为给定的值列获取 map_val 列的多个查找值,如下所示. 示例数据: val data = List((“20"、“分数"、“学校"、“2018-03-31"、14、12),(“21"、“分数"、“学校"、“2018-03-31"、13、13),(“22"、“率"、“学校"、“2018-03-31"、11、1
..
我想用重新分区编写一个大型数据帧,所以我想计算源数据帧的重新分区数. numberofpartition= {数据帧大小/default_blocksize} 所以请告诉我如何在 spark scala 中计算数据帧的大小 提前致谢. 解决方案 Usingspark.sessionState.executePlan(df.queryExecution.logical).o
..
我有一个具有以下结构的 Spark 2.0 数据帧 example: id,小时,计数id1, 0, 12id1, 1, 55..id1, 23, 44id2, 0, 12id2, 1, 89..id2, 23, 34等等. 它包含每个 id 的 24 个条目(一天中的每个小时一个),并使用 orderBy 函数按 id、小时排序. 我创建了一个聚合器groupConcat: def
..
我使用的是带有 spark v2.4.1 的 java8. 我正在尝试使用广播变量 Map 进行查找,如下所示: 输入数据: +-----+-----+-----+|代码1|代码2|代码3|+-----+-----+-----+|1 |7 |5 ||2 |7 |4 ||3 |7 |3 ||4 |7 |2 ||5 |7 |1 |+-----+-----+-----+ 预期输出: +
..
我正在使用从 JSON 事件流转换而来的数据帧处理事件,最终以 Parquet 格式写出. 但是,一些 JSON 事件在键中包含空格,我想在将其转换为 Parquet 之前从数据框中记录和过滤/删除此类事件,因为 ;{}()\n\t= 被视为 Parquet 架构 (CatalystSchemaConverter) 中的特殊字符,如[1] 下面中所列,因此不应在列名中使用. 如何在 D
..
关于将输出 Dstream 插入永久 SQL 表的“Spark Streaming"问题,我一直面临着问题.我想将每个输出 DStream(来自激发进程的单个批次)插入到一个唯一的表中.我一直在使用 Python 和 Spark 1.6.2 版. 在我代码的这一部分,我有一个由一个或多个 RDD 组成的 Dstream,我想将其永久插入/存储到 SQL 表中,而不会丢失每个处理批次的任何结果
..
我的问题是,当我将代码更改为流模式并将数据框放入 foreach 循环时,数据框显示为空表!我不填!我也不能把它放到 assembler.transform() 中.错误是: Error:(38, 40) not enough arguments for method map: (mapFunc: String => U)(implicit evidence$2: scala.reflect.C
..