spark-streaming相关内容

带有流源的查询必须使用 writeStream.start(); 执行;

我正在尝试使用 Spark 结构化流式传输从 Kafka 读取数据并预测表单传入数据.我正在使用我使用 Spark ML 训练过的模型. val spark = SparkSession.builder().appName("Spark SQL 基本示例").master("本地").getOrCreate()导入 spark.implicits._val toString = udf((pay ..

pyspark:ml + 流媒体

根据 结合 Spark Streaming + MLlib 可以制作一个对 spark 中输入流的预测. 给定示例(适用于我的集群)的问题是 testData 是正确格式的给定权限. 我正在尝试根据数据字符串设置客户端 服务器 tcp 交换.我不知道如何以正确的格式转换字符串. 虽然这有效: sep = ";"str_recue = '0.0;0.1;0.2;0.3;0 ..
发布时间:2021-11-14 21:08:42 Python

Spark Streaming - 基于过滤器参数拆分输入流的最佳方法

我目前尝试创建某种监控解决方案 - 一些数据被写入 kafka,我使用 Spark Streaming 读取这些数据并对其进行处理. 为了预处理机器学习和异常检测的数据,我想根据一些过滤器参数拆分流.到目前为止,我已经了解到 DStreams 本身不能分成几个流. 我主要面临的问题是许多算法(如 KMeans)只采用连续数据而不是离散数据,例如url 或其他一些字符串. 我的理想 ..

Spark SQL 删除空格

我有一个简单的 Spark 程序,它读取一个 JSON 文件并发出一个 CSV 文件.在 JSON 数据中,值包含前导和尾随空格,当我发出 CSV 时,前导和尾随空格消失了.有没有办法可以保留这些空间.我尝试了很多选项,例如 ignoreTrailingWhiteSpace 、 ignoreLeadingWhiteSpace 但没有运气 input.json {"key" : "k1", ..

加入查找数据集后进行多列值查找

我正在使用 spark-sql-2.4.1v 如何根据列的值进行各种连接我需要为给定的值列获取 map_val 列的多个查找值,如下所示. 示例数据: val data = List((“20"、“分数"、“学校"、“2018-03-31"、14、12),(“21"、“分数"、“学校"、“2018-03-31"、13、13),(“22"、“率"、“学校"、“2018-03-31"、11、1 ..
发布时间:2021-11-12 05:47:11 其他开发

在pyspark中读取json文件

我是 PySpark 的新手,下面是我来自 kafka 的 JSON 文件格式. {“标题":{"平台":"atm",“版本":“2.0"}“细节":[{"abc":"3",“定义":“4"},{"abc":"5",“定义":“6"},{"abc":"7",“定义":“8"}]} 如何详细阅读所有 "abc" "def" 的值并将其添加到这样的新列表中 [(1,2),(3,4),(5,6),( ..
发布时间:2021-11-12 05:45:24 其他开发

Spark DataFrame:orderBy 之后的 groupBy 是否保持该顺序?

我有一个具有以下结构的 Spark 2.0 数据帧 example: id,小时,计数id1, 0, 12id1, 1, 55..id1, 23, 44id2, 0, 12id2, 1, 89..id2, 23, 34等等. 它包含每个 id 的 24 个条目(一天中的每个小时一个),并使用 orderBy 函数按 id、小时排序. 我创建了一个聚合器groupConcat: def ..

java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$ 运行 TwitterPopularTags 时

我是 Spark 流和 Scala 的初学者.对于项目要求,我试图运行 github 中存在的 TwitterPopularTags 示例.由于 SBT 程序集对我不起作用,而且我不熟悉 SBT,因此我尝试使用 Maven 进行构建.经过大量最初的小问题,我能够创建 jar 文件.但是在尝试执行它时,我收到以下错误.有人能帮我解决这个问题吗? 线程“main"中的异常 java.lang.NoC ..

Spark Dataframe 验证 Parquet 写入的列名

我正在使用从 JSON 事件流转换而来的数据帧处理事件,最终以 Parquet 格式写出. 但是,一些 JSON 事件在键中包含空格,我想在将其转换为 Parquet 之前从数据框中记录和过滤/删除此类事件,因为 ;{}()\n\t= 被视为 Parquet 架构 (CatalystSchemaConverter) 中的特殊字符,如[1] 下面中所列,因此不应在列名中使用. 如何在 D ..

如何将每个 DStream 保存/插入到永久表中

关于将输出 Dstream 插入永久 SQL 表的“Spark Streaming"问题,我一直面临着问题.我想将每个输出 DStream(来自激发进程的单个批次)插入到一个唯一的表中.我一直在使用 Python 和 Spark 1.6.2 版. 在我代码的这一部分,我有一个由一个或多个 RDD 组成的 Dstream,我想将其永久插入/存储到 SQL 表中,而不会丢失每个处理批次的任何结果 ..

如何更新火花流中的广播变量?

我相信,我有一个相对常见的 Spark 流用例: 我有一个对象流,我想根据一些参考数据对其进行过滤 最初,我认为使用广播变量来实现这是一件非常简单的事情: public void startSparkEngine {广播refdataBroadcast= sparkContext.broadcast(getRefData());最终 JavaDStream过滤流 = objectSt ..
发布时间:2021-11-12 05:30:19 Java开发

为什么 Spark 应用程序会因“线程“main"中的“异常"而失败?java.lang.NoClassDefFoundError: ...StringDeserializer"?

我正在开发一个使用 Spark 和 Java 侦听 Kafka 流的 Spark 应用程序. 我使用 kafka_2.10-0.10.2.1. 我为 Kafka 属性设置了各种参数:bootstrap.servers、key.deserializer、value.deserializer 等 我的应用程序编译正常,但是当我提交它时,它失败并显示以下错误: 线程“main"中的异 ..