spark-streaming相关内容
我的建筑,从Twitter收到DStreams的应用程序,停止流上下文的唯一途径是通过回采执行。我不知道是否有设置一个时间和终止流套接字但没有回采整个应用程序的方式吗? 解决方案 您可以使用 awaitTerminationOrTimeout(长) 作为在previous答复中提到,或者你可以从其他线程手动停止流上下文: //在主线程 awaitTermination()
..
我刚刚开始四处寻找有状态的计算与星火流媒体解决方案,当我遇到了updateStateByKey()函数来了。 问题我试图解决: 万传感器产生的每分钟的二进制值。 如果连续值传感器报告是彼此不同的,我想标志,并派下来卡夫卡作为一个国家改变事件。 我的假设是,updateStateByKey()在这个例子中可以使用,但是我并不完全认识到实施相同的建议的方法。 解决方案 我假设你会得到(
..
场景:我做了一些测试火花流。有100条记录中的文件进来每25秒。 问题:处理使用承担平均得分超过23秒,4核PC的本地[*]在程序中。当我与16个内核部署相同的应用程序服务器,我期待在处理时间的改善。但是,我看到它仍然采取同时在16个核心以及(还检查了CPU占用率在Ubuntu和CPU被充分利用)。所有配置都火花提供的默认。 问题: 如果不处理与可用于数据流作业,增加核心数量减少的时间?
..
我有一个Spark流工作,从使用直接的方法卡夫卡集群读取数据。有一个在处理时间,我无法理解,而不是体现在星火UI度量一个周期性高峰。下图显示了这种模式(批处理时间为10秒): 这个问题每次作业运行时间是重复的。 有记录在卡夫卡没有数据被读取所以注意没有真正处理,执行。我期望线持平,接近最小值序列化和发送任务的执行者。 该模式是工作需要9秒(这有5秒调度延迟),接下来的工作需要5秒(无延迟调
..
我有一些问题,同时试图从卡夫卡火花流读取。 我的code是: VAL sparkConf =新SparkConf()。setMaster(“本地[2]”)。setAppName(“KafkaIngestor”) VAL SSC =新的StreamingContext(sparkConf,秒(2))VAL kafkaParams =地图[字符串,字符串]( “zookeeper.conne
..
有关结账目的,我尝试建立一个Amazon S3存储为检查点文件。 VAL checkpointDir =“S3A://bucket-name/checkpoint.txt” VAL SC =新SparkContext(CONF) sc.setLocalProperty(“spark.default.parallelism”,“30”) sc.hadoopConfiguration.set(“f
..
有没有办法一个前pression过滤含有单词“字词1”或其他的“字词2”的台词 是这样的: VAL解析度= lines.filter(行=> line.contains(“字词1”或“字词2”)) 因为这个前pression不起作用。 感谢您提前 解决方案 如果行是一个字符串最佳的选择将正则表达式: VAL模式=“字词1 |字词2”.Rlines.filter(线= GT; p
..
我想获得每2秒钟就有一个JSON线,将它们存储在具有从costum类,我创建元素的列表,并在上下文中每次执行后打印生成的列表中。所以我在做这样的事情: JavaStreamingContext SSC =新JavaStreamingContext(sparkConf,Durations.seconds(2)); JavaReceiverInputDStream<串GT; strea
..
以下是简单code键获取字数超过30秒10秒的窗口大小和幻灯片大小 进口org.apache.spark.SparkConf 进口org.apache.spark.streaming._ 进口org.apache.spark.streaming.StreamingContext._ 进口org.apache.spark.api.java.function._ 进口org.apache.spark
..
我的星火集群有1个主站和3工人(4单独的机器,每台机器1核心)和其他设置,如下面的图片,其中 spark.cores.max 设置为 3 和 spark.executor.cores 也 3 (在 PIC-1 ) 但是,当我提出我的工作,星火产业集群,从星火网络的用户界面,我可以看到只有一个执行程序用于(根据使用的内存和 RDD块在 PIC-2 ),但不是所有的执行者。在这种情况下,处理速度比我
..
我想通过星火连接到凤凰城和经开JDBC驱动程序(切为简便起见,下面全堆栈跟踪)的连接时,我不断收到以下异常: 产生的原因:抛出java.lang.ClassNotFoundException:org.apache.hadoop.hbase.ipc.controller.ClientRpcControllerFactory 在java.net.URLClassLoader的$ 1.
..
我用在变换操作的方法类似的用例href=\"https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#transformations-on-dstreams\"相对=“nofollow”>上DStreams转换的: spamInfoRDD = sc.pickleFile(...)包含垃圾信息#RDD #加入数据流的
..
我试图执行使用Twitter火花流的例子。这是我的code 公共静态无效的主要(字符串..参数){ SparkConf的conf =新SparkConf()setAppName(“Spark_Streaming_Twitter”)setMaster(“本地”)。 JavaSparkContext SC =新JavaSparkContext(CONF); JavaS
..
我们正在使用Apache 1.5.1星火和kafka_2.10-0.8.2.1和卡夫卡DirectStream API从卡夫卡用星火获取数据。 我们创造了卡夫卡的主题具有以下设置 ReplicationFactor:1和副本:1 在所有的卡夫卡实例正在运行,星火就业工作正常。当群集中的卡夫卡实例之一是向下,然而,我们得到以下再现的异常。一段时间后,重新启动我们残疾人卡夫卡的实例,并试图完成星
..
我是新手星火流的概念,从过去两天试图理解星火从插座流一直停留。我看到火花能够读取传递给插座块。然而,它没有在读出块执行任何操作。 下面是星火code foo软件包; 进口的java.io.File; 进口java.util.Arrays中; 进口java.util.LinkedList中; 进口的java.util.List;进口org.apache.spark.SparkConf; 进口o
..
我已经创建了蟒蛇火花应用卡夫卡流,并且可以解析自带通过它的任何文本。 kafkaStream = KafkaUtils.createStream(SSC,zkQuorum,“火花流媒体消费”,{话题:1}) 我想改变这是能够从卡夫卡的话题解析Avro的消息。当从一个文件解析的Avro的消息,我这样做:读者= DataFileReader(开放(“customer.avro”,“R”),Da
..
问题: 我需要比较的 I [日] _row 与的[I-1] th_row COL2 (由 COL1 )。如果 item_i 的的我[日] _row 和 ITEM_ [I-1] _row 不同,那么我需要1递增 ITEM_ [I-1] 的计数。 + -------------- + | COL1 COL2 | + -------------- + | row_1 ITEM_1 | | row_
..
我刚才复制的火花流wodcount蟒蛇code和使用火花提交运行星火集群中的单词计数蟒蛇code,但它显示了以下错误: py4j.protocol.Py4JJavaError:同时呼吁o23.loadClass发生错误。 :抛出java.lang.ClassNotFoundException:org.apache.spark.streaming.kafka.KafkaUtilsPythonHe
..
我使用的卡夫卡星火流获取流数据。 VAL线= KafkaUtils.createDirectStream [阵列(字节),字符串,DefaultDe codeR,StringDe codeR](SSC,kafkaConf,集(专题))。图(_._ 2) 我用这DSTREAM和处理RDDS VAL输出= lines.foreachRDD(RDD = GT; rdd.f
..
我想实现火花斯卡拉流应用。我想使用FILESTREAM()方法来处理Hadoop的目录中新来的文件,以及旧文件present。 我按照FILESTREAM()实现从下面从计算器两个线程为: 斯卡拉星火流FILESTREAM 火花流FILESTREAM 我使用FILESTREAM()如下: VAL linesRDD = ssc.fileStream [LongWritable,文
..