dstream相关内容

如何在Scala中得到Spark Stream中两个DStream的笛卡尔乘积?

我有两个DStream。让A:DStream[X]和B:DStream[Y]。 我想得到它们的笛卡尔积,换句话说,一个新的C:DStream[(X, Y)] 包含所有X和Y值对。 我知道有一个cartesian函数用于RDDS。我只能找到this similar question,但它是Java版本,因此无法回答我的问题。 推荐答案 链接问题答案的scala等效项(忽略Time ..
发布时间:2022-03-29 20:24:56 其他开发

Pyspark - 将控制权转移出 Spark 会话 (sc)

这是关于 的后续问题 Dstream 上的 Pyspark 过滤操作 要计算一天、一小时内出现的错误消息/警告消息的数量 - 人们如何设计工作. 我尝试过的: from __future__ import print_function导入系统从 pyspark 导入 SparkContext从 pyspark.streaming 导入 StreamingContext定义计数():计 ..
发布时间:2021-06-25 18:35:38 其他开发

Dstream 上的 Pyspark 过滤操作

我一直在尝试扩展网络字数,以便能够根据特定关键字过滤行 我使用的是 spark 1.6.2 from __future__ import print_function导入系统从 pyspark 导入 SparkContext从 pyspark.streaming 导入 StreamingContext如果 __name__ == "__main__":如果 len(sys.argv) != ..
发布时间:2021-06-25 18:35:21 其他开发

DStream的笛卡尔

我使用Spark笛卡尔函数来生成N对值的列表. 然后我映射这些值以生成每个用户之间的距离度量: val cartesianUsers:org.apache.spark.rdd.RDD [(distance.classes.User,distance.classes.User)] = users.cartesian(users)cartesianUsers.map(m => manDist ..
发布时间:2021-04-08 20:22:02 其他开发

Spark:从单个DStream中获取多个DStream

有可能在spark中从单个DStream中获取多个DStream.我的用例如下:我正在从HDFS文件获取日志数据流.日志行包含一个id(id = xyz).我需要根据ID对日志行进行不同的处理.因此,我尝试为输入Dstream中的每个ID设置不同的Dstream.我在文档中找不到任何相关内容.有谁知道如何在Spark中实现此目标,或指向此目标的任何链接. 谢谢 解决方案 您不能从单个 ..
发布时间:2021-04-08 20:13:32 其他开发

如何使用Pyspark合并两个Dstream(类似于普通RDD上的.zip)

我知道我们可以在pyspark中组合两个RDD(例如R中的cbind),如下所示: rdd3 = rdd1.zip(rdd2) 我想对pyspark中的两个Dstream执行相同的操作.有可能还是其他选择? 事实上,我正在使用MLlib randomforest模型来预测使用火花流.最后,我想结合功能Dstream&一起预测Dstream以进行进一步的下游处理. 谢谢. ..
发布时间:2021-04-08 20:08:16 其他开发

如何在Apache Spark中将特征提取与DStream结合使用

我有从Kafka通过DStream到达的数据.我想执行特征提取以获得一些关键字. 我不想等待所有数据的到来(因为它打算是可能永远不会结束的连续流),所以我希望以块的形式进行提取-准确性是否会受到影响对我来说并不重要一点. 到目前为止,我整理出了类似的内容: def extractKeywords(stream: DStream[Data]): Unit = { val sp ..
发布时间:2020-09-04 06:34:15 其他开发

如何解决类型不匹配问题(预期:双,实际:股)

下面是我的函数,计算均方根误差。然而,最后一行不能因为错误类型不匹配问题,编译(预期:双,实际:单位)。我尝试过许多不同的方法来解决这个问题,但还是没有成功。任何想法? 高清calculateRMSE(输出:DSTREAM [(双人间,双人间)]):双= { VAL summse = {output.foreachRDD RDD = GT; rdd.map { ..
发布时间:2016-05-22 16:39:44 其他开发

对于在DSTREAM每个RDD我怎么将它转换为一个数组或其他一些典型的Java数据类型?

我想一个DSTREAM转换为数组,列表等这样我就可以翻译为JSON和服务于它的端点。我使用apache的火花,注射Twitter数据。我如何preform在DSTREAM 状态此操作?我似乎无法得到任何工作比其它打印()。 进口org.apache.spark._ 进口org.apache.spark.SparkContext._ 进口org.apache.spark.streaming._ 进 ..
发布时间:2016-05-22 16:03:16 其他开发