spark-streaming相关内容
我正在运行一个Spark工作,只要逻辑进行得非常好。但是,当我使用saveAsTextFile将文件保存在s3存储桶中时,我的输出文件的名称格式为part-00000,part-00001等。有没有办法改变输出文件名? 谢谢。 在Spark中,您可以使用 saveAsNewAPIHadoopFile ,并在hadoop配置中设置 mapreduce.output.basename 参数以更
..
我对Spark很新。我尝试搜索,但我无法得到一个适当的解决方案。我已经在两个盒子(一个主节点和另一个工作节点)上安装了hadoop 2.7.2,我已经按照以下链接设置了集群 http://javadev.org/docs/hadoop/centos/6/installation/multi-node- installation-on-centos-6-non-sucure-mode / 我以ro
..
以下内容已经实现 Kafka Producer使用Spark Streaming从twitter中提取数据。 > Kafka Consumer将数据导入Hive External table(在HDFS上)。 虽然目前工作状况良好。 只有一个我正面临的问题,而我的应用程序将数据插入Hive表中时,它会为每个文件的每行数据创建一个小文件。下面是 代码 //定义从 中读取哪
..
我有一个 Kafka 基于 Spark Streaming 应用程序,每5分钟运行一次。查看运行5天后的统计数据,有几点意见: 处理时间从30秒到50秒逐渐增加。快照如下所示,其中突出显示了处理时间表: 垃圾收集日志显示如下所示: 问题: 有没有一个很好的解释,为什么处理时间已经大幅增加,即使事件数量大致相同(在最后一个波谷期间)? 我差不多70 GC日志在每个处理周期结束时
..
问 题 spark-submit指令,比如我要提交的python文件是几个互相import的文件,并且有的是在文件夹里的 解决方案 http://stackoverflow.com/questions/29485175/spark-submit-failed-with-spark-streaming-workdcount-python-code
..
问 题 当使用 spark streaming 2.0.0 集成 kafka 0.10.0时出现 KafkaConsumer 多线程争用的问题。 部分代码如下: val ssc = new StreamingContext(sc, Seconds(5)) val stream = KafkaUtils.createDirectStream(ssc, PreferCon
..
我是新的在Spark / Scala。我知道如何加载CSV文件: sqlContext.read.format(“csv”) 以及如何读取文本流和文件流: scc.textFileStream(“”“file:/// c:\path\filename”“”); scc.fileStream [LongWritable,Text,TextInputFormat]
..
我正在使用Spark流媒体,Apache kafka和Cassandra的项目。 我使用流式kafka集成。在kafka我有一个生产者使用这个配置发送数据: props.put(“metadata.broker.list”,KafkaProperties.ZOOKEEPER) ; props.put(“bootstrap.servers”,KafkaProperties.SERVER);
..
我有一个有4个节点的集群:3个Spark节点和1个Solr节点。我的CPU是8核,我的内存是32 GB,磁盘空间是SSD。我使用cassandra作为我的数据库。我的数据量是22GB后6小时,我现在有大约3,400万行,这应该在5分钟内阅读。 但它已经无法在这段时间内完成任务。我的未来计划是在5分钟内读取 100百万行。我不知道我可以增加或做得更好,以实现这个结果,以及实现我的未来目标。这是
..
我在下面的火花利用客户接收流的例子作为
..
在我的应用我有一个包含一些数据的两个JavaDStreams。我试图计数在每个JavaDStream行数不过我在日志中接收的结果不是数字,而是一种完全不同的对象,其输出到日志中。我在做什么错在这里? code: //地图评分结果集的鸣叫 JavaDStream< Tuple5<长,弦乐,浮动,浮动,字符串>>结果= scoredTwee
..
我使用的火花流 我的程序不断地从一个文件夹的Hadoop读取流。问题是如果我复制到我的Hadoop文件夹(Hadoop的FS -copyFromLocal)的火花作业启动,但如果我这样做移动(Hadoop的FS -mv / hadoopsourcePath / * /的DestinationPath / ),这是行不通的。 它是火花流的限制? 我要激发流相关的另一个问题: 可以激发流挑特定
..
我使用的火花流蟒蛇读卡夫卡和写入HBase的,我发现saveAsNewAPIHadoopDataset阶段很容易被堵塞的工作。正如下面的图片: 你会发现时间是这个舞台上8个小时。通过HBase的API并火花写数据或直接写信通过HDFS API吗? 解决方案 一个有点晚了,但这里是一个类似的例子 要保存RDD HBase的:搜索结果 考虑包含一行的RDD: {“ID”:3,“名”:
..
我使用的Spark流通过创建的StreamingContext作为获取来自Twitter的鸣叫: VAL SSC =新的StreamingContext(“本地[3]”,“通过twitterfeed”,分(1) ) 和如创建Twitter的数据流: VAL tweetStream = TwitterUtils.createStream(SSC,有的(新OAuthAuthorizati
..
所以,我必须通过亚马逊的Kinesis被流数千个事件到SQS然后倒入一个S3目录。大约每隔10分钟,将创建一个新的文本文件,从室壁运动数据转储到S3。我想成立星火流,使其流被倾倒入S3新的文件。现在我有 进口org.apache.spark.streaming._ VAL currentFileStream = ssc.textFileStream(“S3://桶/目录/ EVENT_NAME
..
我是新来的星火和Scala。我跑了星火流作业Twitter的流行哈希标签。 我增加了一些词的过滤器,并能够过滤掉鸣叫: VAL过滤=阵列(“火花”,“大数据”) VAL流= TwitterUtils.createStream(SSC,无,过滤器) 同样我想添加一个语言过滤器,这样只有英文推特流。 Twitter4j有轨道()和位置。它有一个语言过滤器?如果是这样,它是如何在斯卡拉工作?
..
我创建“gpsLookUpTable”开头,并坚持它,这样我就不需要把它一遍又一遍地做映射。然而,当我尝试访问它里面的foreach我得到空指针异常。任何帮助是AP preciated感谢。 下面是code片段: 高清主(参数:数组[字符串]):单位= {VAL的conf =新SparkConf()...VAL SC =新SparkContext(CONF) VAL SSC =新的Stream
..
数据如下: 第1栏第2栏第3栏第4栏搜索 行1列1行1列1结果 行2列2行2列2结果 排第3行3列3行3结果 排第4行4列4行4结果 排第5行5列5行5结果 排行6行6行6 6搜索 问题:我要分区这个数据,可以说,第1行和行2将被处理为一个分区,行3和第4行的另一个行5和第6行的另一个并创建一个JSON数据一起将它们合并列(列标题与行的数据值)。 输出应该是这样的:结果 [
..
我在竞选每10秒一个Spark流应用程序,它的任务是从卡夫卡消耗数据,转换并存储到基于密钥HDFS。即,每个独特的密钥的文件。我使用了Hadoop的saveAsHadoopFile()API来存储输出,我看到一个文件被每一个独特的密钥生成,但问题是,只有一行被存储为每一个独特的密钥虽然DSTREAM有更多的行相同的密钥。 例如,请考虑以下DSTREAM它有一个独特的密钥, 键值 =====
..
我调查星火流作为反欺诈服务我建立一个解决方案,但我在努力弄清楚究竟是如何将其应用到我的用例。用例是:从用户会话数据流,和一个风险评分被计算为一个给定的用户,数据10秒收集该用户之后。我计划使用2秒的间隔批次的时间,但需要从完整的10第二个窗口中使用的数据。起初,updateStateByKey()似乎是完美的解决方案,因为我可以建立使用该系统收集事件UserRisk对象。麻烦的是,我不知道如何告诉
..