对于在DSTREAM每个RDD我怎么将它转换为一个数组或其他一些典型的Java数据类型? [英] For each RDD in a DStream how do I convert this to an array or some other typical Java data type?

查看:3191
本文介绍了对于在DSTREAM每个RDD我怎么将它转换为一个数组或其他一些典型的Java数据类型?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想一个DSTREAM转换为数组,列表等这样我就可以翻译为JSON和服务于它的端点。我使用apache的火花,注射Twitter数据。我如何preform在DSTREAM 状态此操作?我似乎无法得到任何工作比其它打印()

 进口org.apache.spark._
进口org.apache.spark.SparkContext._
进口org.apache.spark.streaming._
进口org.apache.spark.streaming.twitter._
进口org.apache.spark.streaming.StreamingContext._
进口TutorialHelper._
反对教程{
  高清主(参数:数组[字符串]){    //星火目录的位置
    VAL sparkHome =/选择/火花    //星火集群的网址
    VAL sparkUrl =本地[8]    //所需的JAR文件的位置
    VAL jar文件=目标/斯卡拉-2.10 / tutorial_2.10-0.1-SNAPSHOT.jar    // HDFS目录检查点
    VAL checkpointDir =/ tmp目录    使用twitter.txt //配置Twitter的凭据
    TutorialHelper.configureTwitterCredentials()    VAL SSC =新的StreamingContext(sparkUrl,教程,秒(1),sparkHome,SEQ(jar文件))    VAL过滤器=阵列(#americasgottalent,iamawesome)
    VAL鸣叫= TwitterUtils.createStream(SSC,无,过滤器)    VAL状态= tweets.map(状态=> status.getText())    VAL ARRY =阵列(firstval)
    statuses.foreachRDD {
         ARR:+ _.collect()
    }    ssc.checkpoint(checkpointDir)    ssc.start()
    ssc.awaitTermination()
  }
}


解决方案

如果您的RDD为状态你可以做。

  VAL ARR =新ArrayBuffer [字符串]();
statuses.foreachRDD {
    ARR ++ = _.collect()//现在你可以把它放在一个数组或D瓦特/电子你想用它
    ...
}

请记住,这可能最终会被这样的数据比你想在你的驱动程序,因为一个DSTREAM是巨大的。

I would like to convert a DStream into an array, list, etc. so I can then translate it to json and serve it on an endpoint. I'm using apache spark, injecting twitter data. How do I preform this operation on the Dstream statuses? I can't seem to get anything to work other than print().

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import TutorialHelper._
object Tutorial {
  def main(args: Array[String]) {

    // Location of the Spark directory 
    val sparkHome = "/opt/spark"

    // URL of the Spark cluster
    val sparkUrl = "local[8]"

    // Location of the required JAR files 
    val jarFile = "target/scala-2.10/tutorial_2.10-0.1-SNAPSHOT.jar"

    // HDFS directory for checkpointing
    val checkpointDir = "/tmp" 

    // Configure Twitter credentials using twitter.txt
    TutorialHelper.configureTwitterCredentials()

    val ssc = new StreamingContext(sparkUrl, "Tutorial", Seconds(1), sparkHome, Seq(jarFile))

    val filters = Array("#americasgottalent", "iamawesome")
    val tweets = TwitterUtils.createStream(ssc, None, filters)

    val statuses = tweets.map(status => status.getText())

    val arry = Array("firstval")
    statuses.foreachRDD {
         arr :+ _.collect()
    }

    ssc.checkpoint(checkpointDir)

    ssc.start()
    ssc.awaitTermination()
  }
}

解决方案

If your RDD is statuses you can do.

val arr = new ArrayBuffer[String]();
statuses.foreachRDD {
    arr ++= _.collect() //you can now put it in an array or d w/e you want with it
    ...
}

Keep in mind this could end up being way more data than you want in your driver since a DStream can be huge.

这篇关于对于在DSTREAM每个RDD我怎么将它转换为一个数组或其他一些典型的Java数据类型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆