spark-streaming相关内容

Apache Zeppelin &Spark Streaming:Twitter 示例仅适用于本地

我刚刚从 http://zeppelin-project.org/docs/tutorial/tutorial.html(“流数据教程"部分).我现在遇到的问题是该应用程序似乎只能在本地工作.如果我将 Spark 解释器设置“master"从“local[*]"更改为“spark://master:7077",当我执行相同的 SQL 语句时,应用程序将不再产生任何结果.我做错了什么吗?我已经重新启 ..

Spark Java:如何将数据从 HTTP 源移动到 Couchbase 接收器?

我在 Web 服务器上有一个 .gz 文件,我想以流式方式使用该文件并将数据插入 Couchbase..gz 文件中只有一个文件,每行包含一个 JSON 对象. 由于 Spark 没有 HTTP 接收器,所以我自己写了一个(如下所示).我正在使用 Couchbase Spark 连接器 进行插入.但是,在运行时,该作业实际上并未插入任何内容.我怀疑这是由于我对 Spark 缺乏经验并且不知道 ..

在 Spark Streaming 中更改输出文件名

我正在运行一个 Spark 作业,就逻辑而言,它的性能非常好.但是,当我使用 saveAsTextFile 将文件保存在 s3 存储桶中时,我的输出文件的名称采用 part-00000、part-00001 等格式.有没有办法改变输出文件名? 谢谢. 解决方案 在 Spark 中,您可以使用 saveAsNewAPIHadoopFile 并将 hadoop 配置中的 ma​​pred ..
发布时间:2021-11-14 22:33:30 其他开发

Spark SQL + Window + Streaming 问题 - 使用 Spark 流运行时,Spark SQL 查询需要很长时间才能执行

我们期待使用 Spark Streaming(带水槽)和带窗口的 Spark SQL 实现一个用例,允许我们对一组数据执行 CEP 计算.(有关如何捕获和使用数据,请参见下文).这个想法是使用 SQL 来执行一些匹配特定条件的操作..根据每个传入的事件批次执行查询似乎很慢(随着它的进行). 这里的慢意味着说我配置了 600 秒的窗口大小和 20 秒的批处理间隔.(以每 2 秒 1 个输入的速 ..

SparkContext.getOrCreate() 目的

SparkContext 类中的 getOrCreate 方法的目的是什么?我不明白我们什么时候应该使用这种方法. 如果我有 2 个使用 spark-submit 运行的 spark 应用程序,并且在主方法中我使用 SparkContext.getOrCreate 实例化了 spark 上下文,则两个应用程序都将具有相同的上下文? 或者目的更简单,唯一的目的是当我创建一个spark应用 ..

如何从地图/过滤器/等执行异步操作(即返回未来)?

我有一个 DataSet.map 操作,需要从外部 REST API 中提取数据. REST API 客户端返回一个 Future[Int]. 是否可以让 DataSet.map 操作以某种方式异步等待这个 Future?或者我是否需要使用 Await.result 来阻塞线程?或者这只是没有完成的事情......即我应该尝试将API保存的数据加载到它自己的DataSet中,然后执行j ..
发布时间:2021-11-14 22:32:06 其他开发

即使在应用程序中设置内核时,Spark UI 也显示 0 个内核

我在从 spark master url 运行应用程序时遇到一个奇怪的问题,其中 UI 无限期地报告“WAITING"的“STATE",因为 0 个内核显示在 RUNNING APPLICATIONs 表下,无论我配置什么核心计数. 我使用以下设置配置了我的应用程序,其中 spark.max.cores = 2 &spark.default.cores = 2 &内存设置为3GB.该机器是具 ..

如何刷新表并同时进行?

我使用的是 Spark Streaming 2.1.我想定期刷新一些缓存表(由 spark 提供的数据源加载,如 parquet、MySQL 或用户定义的数据源). 如何刷新表格? 假设我加载了一些表 spark.read.format("").load().createTempView("my_table") 并且它也被缓存 spark.sql("缓存表 my_ta ..
发布时间:2021-11-14 22:21:31 其他开发

通过 apache spark 将行作为列表与组一起收集

我有一个特定的用例,我为同一客户有多个行,其中每个行对象看起来像: root-c1:BigInt-c2:字符串-c3:双倍-c4:双倍-c5:地图[字符串,整数] 现在我已经按列 c1 进行分组并将所有行收集为同一客户的列表,例如: c1, [Row1, Row3, Row4]c2, [Row2, Row5] 我试过这样做dataset.withColumn("combined", arr ..
发布时间:2021-11-14 22:19:33 Java开发

如何处理火花结构化流中的小文件问题?

我的项目中有一个场景,我正在使用 spark-sql-2.4.1 版本读取 kafka 主题消息.我能够使用结构化流处理这一天.收到数据并进行处理后,我需要将数据保存到 hdfs 存储中的相应镶木地板文件中. 我能够存储和读取镶木地板文件,我将触发时间保持在 15 秒到 1 分钟之间.这些文件非常小,因此导致文件很多. 这些 parquet 文件需要稍后通过 hive 查询读取. ..

如何使用 Spark 高效读取多个小型镶木地板文件?有CombineParquetInputFormat 吗?

Spark 生成了多个小型镶木地板文件.如何有效地处理生产者和消费者 Spark 作业上的少量镶木地板文件. 解决方案 import org.apache.hadoop.mapreduce.InputSplit;导入 org.apache.hadoop.mapreduce.RecordReader;导入 org.apache.hadoop.mapreduce.TaskAttemptCont ..

复制当前行,修改它并在spark中添加一个新行

我使用的是带有 java8 版本的 spark-sql-2.4.1v.我有一个场景,我需要复制当前行并创建另一行修改几列数据如何在 spark-sql 中实现? 例如:给定 val data = List((“20",“分数",“学校",14 ,12),(“21"、“分数"、“学校"、13、13),(“22"、“比率"、“学校"、11 ,14))val df = data.toDF(“i ..
发布时间:2021-11-14 22:07:16 其他开发

如何使用特定用户初始化 spark shell 以通过 apache spark 将数据保存到 hdfs

我正在使用 ubuntu 我使用 Intellij 使用 spark 依赖 未找到命令“spark",但可以通过以下方式安装:..(当我在 shell 中输入 spark 时) 我有两个用户 amine 和 hadoop_amine(设置了 hadoop hdfs) 当我尝试将数据帧保存到 HDFS (spark scala) 时: proceded.write.format("js ..