spark-streaming相关内容

停止一段时间后流在星火流上下文

我的建筑,从Twitter收到DStreams的应用程序,停止流上下文的唯一途径是通过回采执行。我不知道是否有设置一个时间和终止流套接字但没有回采整个应用程序的方式吗? 解决方案 您可以使用 awaitTerminationOrTimeout(长) 作为在previous答复中提到,或者你可以从其他线程手动停止流上下文: //在主线程 awaitTermination() ..
发布时间:2016-05-22 16:21:27 其他开发

updateStateByKey()在星火流的使用,从原始事件流产生的状态变化的流

我刚刚开始四处寻找有状态的计算与星火流媒体解决方案,当我遇到了updateStateByKey()函数来了。 问题我试图解决: 万传感器产生的每分钟的二进制值。 如果连续值传感器报告是彼此不同的,我想标志,并派下来卡夫卡作为一个国家改变事件。 我的假设是,updateStateByKey()在这个例子中可以使用,但是我并不完全认识到实施相同的建议的方法。 解决方案 我假设你会得到( ..
发布时间:2016-05-22 16:20:51 其他开发

星火流 - 相同的处理时间为4芯和16芯。为什么?

场景:我做了一些测试火花流。有100条记录中的文件进来每25秒。 问题:处理使用承担平均得分超过23秒,4核PC的本地[*]在程序中。当我与16个内核部署相同的应用程序服务器,我期待在处理时间的改善。但是,我看到它仍然采取同时在16个核心以及(还检查了CPU占用率在Ubuntu和CPU被充分利用)。所有配置都火花提供的默认。 问题: 如果不处理与可用于数据流作业,增加核心数量减少的时间? ..
发布时间:2016-05-22 16:20:36 其他开发

星火流卡夫卡直接流处理时的性能尖峰

我有一个Spark流工作,从使用直接的方法卡夫卡集群读取数据。有一个在处理时间,我无法理解,而不是体现在星火UI度量一个周期性高峰。下图显示了这种模式(批处理时间为10秒): 这个问题每次作业运行时间是重复的。 有记录在卡夫卡没有数据被读取所以注意没有真正处理,执行。我期望线持平,接近最小值序列化和发送任务的执行者。 该模式是工作需要9秒(这有5秒调度延迟),接下来的工作需要5秒(无延迟调 ..
发布时间:2016-05-22 16:19:57 其他开发

星火流卡夫卡流

我有一些问题,同时试图从卡夫卡火花流读取。 我的code是: VAL sparkConf =新SparkConf()。setMaster(“本地[2]”)。setAppName(“KafkaIngestor”) VAL SSC =新的StreamingContext(sparkConf,秒(2))VAL kafkaParams =地图[字符串,字符串]( “zookeeper.conne ..
发布时间:2016-05-22 16:19:52 其他开发

由两个词过滤线星火流

有没有办法一个前pression过滤含有单词“字词1”或其他的“字词2”的台词 是这样的: VAL解析度= lines.filter(行=> line.contains(“字词1”或“字词2”)) 因为这个前pression不起作用。 感谢您提前 解决方案 如果行是一个字符串最佳的选择将正则表达式: VAL模式=“字词1 |字词2”.Rlines.filter(线= GT; p ..
发布时间:2016-05-22 16:19:09 其他开发

在星火流多个打印()方法,为什么会影响我的列表中的值?

我想获得每2秒钟就有一个JSON线,将它们存储在具有从costum类,我创建元素的列表,并在上下文中每次执行后打印生成的列表中。所以我在做这样的事情: JavaStreamingContext SSC =新JavaStreamingContext(sparkConf,Durations.seconds(2)); JavaReceiverInputDStream<串GT; strea ..
发布时间:2016-05-22 16:19:03 其他开发

星火流窗口操作

以下是简单code键获取字数超过30秒10秒的窗口大小和幻灯片大小 进口org.apache.spark.SparkConf 进口org.apache.spark.streaming._ 进口org.apache.spark.streaming.StreamingContext._ 进口org.apache.spark.api.java.function._ 进口org.apache.spark ..
发布时间:2016-05-22 16:15:39 其他开发

为什么星火未分配任务给所有执行者,但只有一个执行者?

我的星火集群有1个主站和3工人(4单独的机器,每台机器1核心)和其他设置,如下面的图片,其中 spark.cores.max 设置为 3 和 spark.executor.cores 也 3 (在 PIC-1 ) 但是,当我提出我的工作,星火产业集群,从星火网络的用户界面,我可以看到只有一个执行程序用于(根据使用的内存和 RDD块在 PIC-2 ),但不是所有的执行者。在这种情况下,处理速度比我 ..

找不到设置领袖([TOPICNNAME,0))当我们uisng在Apache中Saprk

我们正在使用Apache 1.5.1星火和kafka_2.10-0.8.2.1和卡夫卡DirectStream API从卡夫卡用星火获取数据。 我们创造了卡夫卡的主题具有以下设置 ReplicationFactor:1和副本:1 在所有的卡夫卡实例正在运行,星火就业工作正常。当群集中的卡夫卡实例之一是向下,然而,我们得到以下再现的异常。一段时间后,重新启动我们残疾人卡夫卡的实例,并试图完成星 ..
发布时间:2016-05-22 16:12:44 其他开发

星火不流在读取的块进行操作

我是新手星火流的概念,从过去两天试图理解星火从插座流一直停留。我看到火花能够读取传递给插座块。然而,它没有在读出块执行任何操作。 下面是星火code foo软件包; 进口的java.io.File; 进口java.util.Arrays中; 进口java.util.LinkedList中; 进口的java.util.List;进口org.apache.spark.SparkConf; 进口o ..
发布时间:2016-05-22 16:12:33 Java开发

星火Python的Avro的卡夫卡解串器

我已经创建了蟒蛇火花应用卡夫卡流,并且可以解析自带通过它的任何文本。 kafkaStream = KafkaUtils.createStream(SSC,zkQuorum,“火花流媒体消费”,{话题:1}) 我想改变这是能够从卡夫卡的话题解析Avro的消息。当从一个文件解析的Avro的消息,我这样做:读者= DataFileReader(开放(“customer.avro”,“R”),Da ..
发布时间:2016-05-22 16:12:17 Python

SparkStreaming:错误FILESTREAM()

我想实现火花斯卡拉流应用。我想使用FILESTREAM()方法来处理Hadoop的目录中新来的文件,以及旧文件present。 我按照FILESTREAM()实现从下面从计算器两个线程为: 斯卡拉星火流FILESTREAM 火花流FILESTREAM 我使用FILESTREAM()如下: VAL linesRDD = ssc.fileStream [LongWritable,文 ..
发布时间:2016-05-22 16:07:17 其他开发