spark-streaming相关内容

更改Spark Streaming中的输出文件名称

我正在运行一个Spark工作,只要逻辑进行得非常好。但是,当我使用saveAsTextFile将文件保存在s3存储桶中时,我的输出文件的名称格式为part-00000,part-00001等。有没有办法改变输出文件名? 谢谢。 在Spark中,您可以使用 saveAsNewAPIHadoopFile ,并在hadoop配置中设置 mapreduce.output.basename 参数以更 ..

在Yarn集群上运行的Spark作业java.io.FileNotFoundException:文件不退出,因为文件在主节点上退出

我对Spark很新。我尝试搜索,但我无法得到一个适当的解决方案。我已经在两个盒子(一个主节点和另一个工作节点)上安装了hadoop 2.7.2,我已经按照以下链接设置了集群 http://javadev.org/docs/hadoop/centos/6/installation/multi-node- installation-on-centos-6-non-sucure-mode / 我以ro ..
发布时间:2018-05-31 18:45:00 分布式计算/Hadoop

Hive为HDFS中的每个插入创建多个小文件

以下内容已经实现 Kafka Producer使用Spark Streaming从twitter中提取数据。 > Kafka Consumer将数据导入Hive External table(在HDFS上)。 虽然目前工作状况良好。 只有一个我正面临的问题,而我的应用程序将数据插入Hive表中时,它会为每个文件的每行数据创建一个小文件。下面是 代码 //定义从 中读取哪 ..
发布时间:2018-05-31 18:42:33 分布式计算/Hadoop

Spark Streaming:应用程序运行状况

我有一个 Kafka 基于 Spark Streaming 应用程序,每5分钟运行一次。查看运行5天后的统计数据,有几点意见: 处理时间从30秒到50秒逐渐增加。快照如下所示,其中突出显示了处理时间表: 垃圾收集日志显示如下所示: 问题: 有没有一个很好的解释,为什么处理时间已经大幅增加,即使事件数量大致相同(在最后一个波谷期间)? 我差不多70 GC日志在每个处理周期结束时 ..

Spark Scala流式CSV

我是新的在Spark / Scala。我知道如何加载CSV文件: sqlContext.read.format(“csv”) 以及如何读取文本流和文件流: scc.textFileStream(“”“file:/// c:\path\filename”“”); scc.fileStream [LongWritable,Text,TextInputFormat] ..
发布时间:2017-02-24 23:52:57 Office

为什么我的Spark流媒体应用这么慢?

我有一个有4个节点的集群:3个Spark节点和1个Solr节点。我的CPU是8核,我的内存是32 GB,磁盘空间是SSD。我使用cassandra作为我的数据库。我的数据量是22GB后6小时,我现在有大约3,400万行,这应该在5分钟内阅读。 但它已经无法在这段时间内完成任务。我的未来计划是在5分钟内读取 100百万行。我不知道我可以增加或做得更好,以实现这个结果,以及实现我的未来目标。这是 ..

星火流:Rdd.Count()没有返回一个有效的数字

在我的应用我有一个包含一些数据的两个JavaDStreams。我试图计数在每个JavaDStream行数不过我在日志中接收的结果不是数字,而是一种完全不同的对象,其输出到日志中。我在做什么错在这里? code: //地图评分结果集的鸣叫 JavaDStream< Tuple5<长,弦乐,浮动,浮动,字符串>>结果= scoredTwee ..
发布时间:2016-05-22 16:50:02 Java开发

火花是工作流既" CP"和" MV"

我使用的火花流 我的程序不断地从一个文件夹的Hadoop读取流。问题是如果我复制到我的Hadoop文件夹(Hadoop的FS -copyFromLocal)的火花作业启动,但如果我这样做移动(Hadoop的FS -mv / hadoopsourcePath / * /的DestinationPath / ),这是行不通的。 它是火花流的限制? 我要激发流相关的另一个问题: 可以激发流挑特定 ..
发布时间:2016-05-22 16:49:59 其他开发

火花写数据分流到HBase的与Python封锁saveAsNewAPIHadoopDataset

我使用的火花流蟒蛇读卡夫卡和写入HBase的,我发现saveAsNewAPIHadoopDataset阶段很容易被堵塞的工作。正如下面的图片: 你会发现时间是这个舞台上8个小时。通过HBase的API并火花写数据或直接写信通过HDFS API吗? 解决方案 一个有点晚了,但这里是一个类似的例子 要保存RDD HBase的:搜索结果 考虑包含一行的RDD: {“ID”:3,“名”: ..
发布时间:2016-05-22 16:48:10 其他开发

星火流在S3目录

所以,我必须通过亚马逊的Kinesis被流数千个事件到SQS然后倒入一个S3目录。大约每隔10分钟,将创建一个新的文本文件,从室壁运动数据转储到S3。我想成立星火流,使其流被倾倒入S3新的文件。现在我有 进口org.apache.spark.streaming._ VAL currentFileStream = ssc.textFileStream(“S3://桶/目录/ EVENT_NAME ..

增加语言过滤器叽叽喳喳popularhashtags - 斯卡拉

我是新来的星火和Scala。我跑了星火流作业Twitter的流行哈希标签。 我增加了一些词的过滤器,并能够过滤掉鸣叫: VAL过滤=阵列(“火花”,“大数据”) VAL流= TwitterUtils.createStream(SSC,无,过滤器) 同样我想添加一个语言过滤器,这样只有英文推特流。 Twitter4j有轨道()和位置。它有一个语言过滤器?如果是这样,它是如何在斯卡拉工作? ..
发布时间:2016-05-22 16:47:07 其他开发

空指针异常当试图使用持续表中星火流

我创建“gpsLookUpTable”开头,并坚持它,这样我就不需要把它一遍又一遍地做映射。然而,当我尝试访问它里面的foreach我得到空指针异常。任何帮助是AP preciated感谢。 下面是code片段: 高清主(参数:数组[字符串]):单位= {VAL的conf =新SparkConf()...VAL SC =新SparkContext(CONF) VAL SSC =新的Stream ..
发布时间:2016-05-22 16:46:11 其他开发

控制阿帕奇星火数据分区

数据如下: 第1栏第2栏第3栏第4栏搜索 行1列1行1列1结果 行2列2行2列2结果 排第3行3列3行3结果 排第4行4列4行4结果 排第5行5列5行5结果 排行6行6行6 6搜索 问题:我要分区这个数据,可以说,第1行和行2将被处理为一个分区,行3和第4行的另一个行5和第6行的另一个并创建一个JSON数据一起将它们合并列(列标题与行的数据值)。 输出应该是这样的:结果 [ ..
发布时间:2016-05-22 16:45:45 其他开发

数据同时写入星火流输出到HDFS跳过

我在竞选每10秒一个Spark流应用程序,它的任务是从卡夫卡消耗数据,转换并存储到基于密钥HDFS。即,每个独特的密钥的文件。我使用了Hadoop的saveAsHadoopFile()API来存储输出,我看到一个文件被每一个独特的密钥生成,但问题是,只有一行被存储为每一个独特的密钥虽然DSTREAM有更多的行相同的密钥。 例如,请考虑以下DSTREAM它有一个独特的密钥, 键值 ===== ..
发布时间:2016-05-22 16:45:33 Java开发

试图了解火花流窗口

我调查星火流作为反欺诈服务我建立一个解决方案,但我在努力弄清楚究竟是如何将其应用到我的用例。用例是:从用户会话数据流,和一个风险评分被计算为一个给定的用户,数据10秒收集该用户之后。我计划使用2秒的间隔批次的时间,但需要从完整的10第二个窗口中使用的数据。起初,updateStateByKey()似乎是完美的解决方案,因为我可以建立使用该系统收集事件UserRisk对象。麻烦的是,我不知道如何告诉 ..
发布时间:2016-05-22 16:43:55 其他开发