spark-streaming相关内容
我在开发期间一直在“客户端”模式下运行我的Spark作业。我使用“--file”与执行器共享配置文件。驱动程序正在本地读取配置文件。现在,我想在“集群”模式下部署作业。我现在无法与驱动程序共享配置文件。 例如,我将配置文件名作为Extra Java Options传递给驱动程序和执行器。我正在使用SparkFiles.get()读取文件 val configFile = org.a
..
文件名是否应该包含一个编号,以供teFileStream拾取?只有当文件名包含数字时,我的程序才会拾取新文件。忽略所有其他文件,即使它们是新文件。需要更改什么设置才能拾取所有文件吗?请帮帮忙 推荐答案 花了几个小时分析堆栈跟踪后,我发现问题出在S3地址。我提供的是“s3://myBucket”,它适用于Spark 1.6和Scala 2.10.5。在Spark 2.0(和Scala 2
..
我想从火花流到几个弹性搜索索引。 我创建了成对的,当我执行groupByKey时,结果是>的元组,但为了使用ElasticSearch-Spark插件保存到ElasticSearch,我需要JavaRDD的值。 我知道有一个可以从List创建Java RDD的SparkConext.p
..
我正在开发一个Spark流应用程序,其中我需要使用来自两个服务器的输入流,每个服务器每秒向Spark上下文发送一条JSON消息。 我的问题是,如果我只在一个流上执行操作,一切都运行得很好。但如果我有来自不同服务器的两个流,那么Spark在可以打印任何东西之前冻结,并且只有在两个服务器都发送了它们必须发送的所有JSON消息时(当它检测到socketTextStream没有接收数据时)才开始重新
..
您好,我正在尝试下载以下Build.sbt文件中的spark-core、spark-streaming、twitter4j和spark-streaming-twitter: name := "hello" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spa
..
我刚刚开始使用Spark,并在Amazon EC2实例上以独立模式运行它。我正在尝试文档中提到的示例,在查看这个名为Simple App的示例时,我不断收到以下错误: NameError:未定义名称‘Numas’ from pyspark import SparkContext logFile = "$YOUR_SPARK_HOME/README.md" # Should be some
..
我正在对电光结构化流进行窗口排序: val filterWindow: WindowSpec = Window .partitionBy("key") .orderBy($"time") controlDataFrame=controlDataFrame.withColumn("Make Coffee", $"value"). withColumn("datetime"
..
下面是消费RabbitMQ消息的电光流媒体代码。 import java.io.{ BufferedReader, InputStreamReader } import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.SparkConf import org.apache.sp
..
我在做卡夫卡的火花直播。我想把我的RDD从卡夫卡转换成数据帧。 我正在使用以下方法。 Val SSC=new StreamingContext(“local[*]”,“KafkaExample”,Second(4)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "dofff2.dl.uk.feef
..
我在这方面完全是新手,如果有明显的错误,请原谅。 精确错误: 《在奴隶》: Info TransportClientFactory:69毫秒后成功创建到/10.2.10.128:7077的连接(0毫秒用于引导) 警告工作进程:无法连接到主10.2.10.128:7077 在Master: 资讯大师:我被选为领袖!新状态:活着 错误TransportRequestHandler:在RPC
..
我正在尝试实现一个使用自定义接收器从SQS读取消息的流作业。每封邮件都包含对S3文件的单个引用,然后我希望读取、解析该文件并将其存储为ORC。 以下是我到目前为止拥有的代码: val sc = new SparkContext(conf) val streamContext = new StreamingContext(sc, Seconds(5)) val sqs = stream
..
我有两个DStream。让A:DStream[X]和B:DStream[Y]。 我想得到它们的笛卡尔积,换句话说,一个新的C:DStream[(X, Y)] 包含所有X和Y值对。 我知道有一个cartesian函数用于RDDS。我只能找到this similar question,但它是Java版本,因此无法回答我的问题。 推荐答案 链接问题答案的scala等效项(忽略Time
..
我正在更新代码,以便从updateStateByKey切换到mapWithState,以便基于2分钟的超时获得用户会话(2分钟仅用于测试目的)。每个会话应在超时之前聚合会话内的所有流数据(JSON字符串)。 这是我的旧代码: val membersSessions = stream.map[(String, (Long, Long, List[String]))](eventRecor
..
我正在做一个项目,使用Spark Streaming将数据从CSV文件批量加载到HBase。我使用的代码如下(改编自here): def bulk_load(rdd): conf = {#removed for brevity} keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBy
..
在电光流媒体2.0.0作业中使用->,或者使用spark-streaming-kafka-0-8_2.11v2.0.0,并通过spark-submit提交时,出现以下错误: 线程“Main”中的异常org.apache.spk.SparkException:作业因阶段失败而中止:阶段72.0中的任务0失败1次,最近的失败:阶段72.0中丢失的任务0.0(TID37,localhost):ja
..
在我的方案中,我分解一个数组列,以便每行有一条记录,这样我就可以执行联接,然后将这些分解的列重新组合在一起 +--------------+-------+------------------------+ | body | ID | array_column | +--------------+-------+-------------------
..
我已经通过Spark批处理作业在HDFS中持久化了机器学习模型,我正在使用它。基本上,ML模型是从Spark驱动程序向所有执行器广播的。 有人能建议我如何在不停止Spark流作业的情况下实时更新模型吗?基本上,当有更多的数据点可用时,将创建一个新的ML模型,但不知道如何将新模型发送给Spark Executor。 请求发布一些示例代码。 问候, Deepak。 推荐答案 最好的方
..
我是Spark的新手(老实说,我也是Python的新手),如果我错过了一些明显的东西,请原谅我。 我正在使用Spark和Python进行文件流传输。在我所做的第一个示例中,Spark正确地侦听给定的目录并计算文件中出现的单词,因此我知道一切都是在侦听该目录的情况下工作的。 现在,我正在尝试获取为进行审计而处理的文件的名称。我在这里读到 http://mail-archives.us.apa
..
我们正在运行一个Spark Stream作业,该作业从一个目录(使用TextFileStream)检索文件。 我们关注的一个问题是,作业已关闭,但文件仍在添加到目录中。 一旦作业重新启动,这些文件就不会被拾取(因为它们不是新的或在作业运行时更改),但我们希望处理它们。 1)有解决方案吗?有没有办法跟踪已处理的文件,以及是否可以“强制”拾取较旧的文件? 2)是否有方法删除已处理的文件?
..
我当前的结构化流应用程序写入一个巨大的Delta表。当我(停止流)并将其指向写入全新的增量表时: 它变得更快-批量持续时间减少了近四分之一 投入率几乎提高了3倍 我知道它可能会变得更快,因为它在旧的/更大的表上执行的任何聚合/写入在新表上都不需要。但投入率的变化我希望有人能解释一下? 源为Azure EventHubs。 谢谢! 推荐答案 回答我自己的问题:
..