spark-streaming相关内容

在集群模式下与Spark-Submit共享配置文件

我在开发期间一直在“客户端”模式下运行我的Spark作业。我使用“--file”与执行器共享配置文件。驱动程序正在本地读取配置文件。现在,我想在“集群”模式下部署作业。我现在无法与驱动程序共享配置文件。 例如,我将配置文件名作为Extra Java Options传递给驱动程序和执行器。我正在使用SparkFiles.get()读取文件 val configFile = org.a ..
发布时间:2022-08-08 17:34:31 其他开发

S3上的Spark Text文件流

文件名是否应该包含一个编号,以供teFileStream拾取?只有当文件名包含数字时,我的程序才会拾取新文件。忽略所有其他文件,即使它们是新文件。需要更改什么设置才能拾取所有文件吗?请帮帮忙 推荐答案 花了几个小时分析堆栈跟踪后,我发现问题出在S3地址。我提供的是“s3://myBucket”,它适用于Spark 1.6和Scala 2.10.5。在Spark 2.0(和Scala 2 ..
发布时间:2022-07-03 19:02:32 其他开发

为什么当我发送两个输入流时,Spark Streaming停止工作?

我正在开发一个Spark流应用程序,其中我需要使用来自两个服务器的输入流,每个服务器每秒向Spark上下文发送一条JSON消息。 我的问题是,如果我只在一个流上执行操作,一切都运行得很好。但如果我有来自不同服务器的两个流,那么Spark在可以打印任何东西之前冻结,并且只有在两个服务器都发送了它们必须发送的所有JSON消息时(当它检测到socketTextStream没有接收数据时)才开始重新 ..
发布时间:2022-07-03 18:45:03 Java开发

电光独立版:传输请求处理程序:调用RpcHandler时出错-在不同计算机/VM上启动工作程序时

我在这方面完全是新手,如果有明显的错误,请原谅。 精确错误: 《在奴隶》: Info TransportClientFactory:69毫秒后成功创建到/10.2.10.128:7077的连接(0毫秒用于引导) 警告工作进程:无法连接到主10.2.10.128:7077 在Master: 资讯大师:我被选为领袖!新状态:活着 错误TransportRequestHandler:在RPC ..
发布时间:2022-03-29 20:34:34 其他开发

SparkConext.text文件可以与自定义接收器一起使用吗?

我正在尝试实现一个使用自定义接收器从SQS读取消息的流作业。每封邮件都包含对S3文件的单个引用,然后我希望读取、解析该文件并将其存储为ORC。 以下是我到目前为止拥有的代码: val sc = new SparkContext(conf) val streamContext = new StreamingContext(sc, Seconds(5)) val sqs = stream ..
发布时间:2022-03-29 20:29:21 其他开发

如何在Scala中得到Spark Stream中两个DStream的笛卡尔乘积?

我有两个DStream。让A:DStream[X]和B:DStream[Y]。 我想得到它们的笛卡尔积,换句话说,一个新的C:DStream[(X, Y)] 包含所有X和Y值对。 我知道有一个cartesian函数用于RDDS。我只能找到this similar question,但它是Java版本,因此无法回答我的问题。 推荐答案 链接问题答案的scala等效项(忽略Time ..
发布时间:2022-03-29 20:24:56 其他开发

如何使用mapWithState提取超时会话

我正在更新代码,以便从updateStateByKey切换到mapWithState,以便基于2分钟的超时获得用户会话(2分钟仅用于测试目的)。每个会话应在超时之前聚合会话内的所有流数据(JSON字符串)。 这是我的旧代码: val membersSessions = stream.map[(String, (Long, Long, List[String]))](eventRecor ..
发布时间:2022-03-29 20:23:25 其他开发

找不到具有Python类的火花流异常

我正在做一个项目,使用Spark Streaming将数据从CSV文件批量加载到HBase。我使用的代码如下(改编自here): def bulk_load(rdd): conf = {#removed for brevity} keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBy ..
发布时间:2022-03-29 20:17:35 Python

Spark 2.0.0打包了SBT-Assembly的流作业缺少Scala运行时方法

在电光流媒体2.0.0作业中使用->,或者使用spark-streaming-kafka-0-8_2.11v2.0.0,并通过spark-submit提交时,出现以下错误: 线程“Main”中的异常org.apache.spk.SparkException:作业因阶段失败而中止:阶段72.0中的任务0失败1次,最近的失败:阶段72.0中丢失的任务0.0(TID37,localhost):ja ..
发布时间:2022-03-29 20:15:57 其他开发

火花流中的ML模型更新

我已经通过Spark批处理作业在HDFS中持久化了机器学习模型,我正在使用它。基本上,ML模型是从Spark驱动程序向所有执行器广播的。 有人能建议我如何在不停止Spark流作业的情况下实时更新模型吗?基本上,当有更多的数据点可用时,将创建一个新的ML模型,但不知道如何将新模型发送给Spark Executor。 请求发布一些示例代码。 问候, Deepak。 推荐答案 最好的方 ..
发布时间:2022-03-29 20:08:51 其他开发

Spark Streaming:如何在Python中获取已处理文件的文件名

我是Spark的新手(老实说,我也是Python的新手),如果我错过了一些明显的东西,请原谅我。 我正在使用Spark和Python进行文件流传输。在我所做的第一个示例中,Spark正确地侦听给定的目录并计算文件中出现的单词,因此我知道一切都是在侦听该目录的情况下工作的。 现在,我正在尝试获取为进行审计而处理的文件的名称。我在这里读到 http://mail-archives.us.apa ..
发布时间:2022-03-29 20:07:41 Python

Spark Streaming中如何处理旧数据和删除已处理的数据

我们正在运行一个Spark Stream作业,该作业从一个目录(使用TextFileStream)检索文件。 我们关注的一个问题是,作业已关闭,但文件仍在添加到目录中。 一旦作业重新启动,这些文件就不会被拾取(因为它们不是新的或在作业运行时更改),但我们希望处理它们。 1)有解决方案吗?有没有办法跟踪已处理的文件,以及是否可以“强制”拾取较旧的文件? 2)是否有方法删除已处理的文件? ..
发布时间:2022-03-29 20:05:01 其他开发

电光结构化流媒体:处理负载是否影响输入率/数字输入记录?

我当前的结构化流应用程序写入一个巨大的Delta表。当我(停止流)并将其指向写入全新的增量表时: 它变得更快-批量持续时间减少了近四分之一 投入率几乎提高了3倍 我知道它可能会变得更快,因为它在旧的/更大的表上执行的任何聚合/写入在新表上都不需要。但投入率的变化我希望有人能解释一下? 源为Azure EventHubs。 谢谢! 推荐答案 回答我自己的问题: ..