星火无法获取从亚马逊室壁运动赛事 [英] Spark not able to fetch events from Amazon Kinesis

查看:241
本文介绍了星火无法获取从亚马逊室壁运动赛事的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在试图让星火阅读室壁运动最近的事件,但我收到的事件有问题。虽然Spark是能够连接到室壁运动,并能够从室壁运动获得的元数据,它不能够从它的事件。它总是取零元素了。

有没有错误,只是空的结果返回。 Spark是能够得到的元数据(例如,在室壁运动等碎片的数量)。

我已经使用了这些[1安培; 2]得到它的工作,但没有得到多少运气还指导。我也尝试过一些建议从SO [3]。群集有足够的资源/内核提供。

我们已经看到Spark和室壁运动之间的Protobuf版本的版本冲突,也可能是对这种行为的原因。星火采用的protobuf-Java版本2.5.0和室壁运动可能使用的protobuf-java的2.6.1.jar。

如果任何人遇到这种行为,或者得到了火花与室壁运动的工作只是想知道。

星火1.5.0都试过了,星火1.6.0。


  1. http://spark.apache.org/docs/latest/流式室壁运动-integration.html

  2. <一个href=\"https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala\" rel=\"nofollow\">https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala


  3. 阿帕奇星火室壁运动样品不工作



解决方案

回答我的问题 -

我有一些成功的星火室壁运动的整合,关键是在unionStreams.foreachRDD。

有可用的2个版本的foreachRDD的


  • unionStreams.foreachRDD

  • unionStreams.foreachRDD((RDD:RDD [数组[字节],时间:时间)

由于某些原因,第一个是不能够让我的结果,但切换到第二个取我结果符合预期。然而探索的原因。

添加低于code片段,以供参考。

另外考虑改变这一点。这帮助了我,以及 -

 org.apache.spark%火花流,室壁运动-asl_2.10%1.6.0,//不工作
org.apache.spark%火花流,室壁运动-asl_2.10%1.4.1,//作品

希望它可以帮助别人:)

感谢大家的帮助。

VAL kinesisStreams =(0,直到numStreams).MAP {
  数=&GT;
    VAL流= KinesisUtils.createStream(
      SSC,
      consumerName,
      streamName中,
      endpointUrl,
      regionName,
      InitialPositionInStream.TRIM_HORIZON,
      kinesisCheckpointInterval,
      StorageLevel.MEMORY_AND_DISK_2
    )    流
}
VAL unionStreams = ssc.union(kinesisStreams)的println(S========================)
的println(S流人数:$ {numStreams})
的println(S========================)/*unionStreams.foreachRDD {//不起作用!
  RDD = GT;
    的println(rdd.count)
    的println(RDD的isEmpty:+ rdd.isEmpty)
} * /
unionStreams.foreachRDD((RDD:RDD [数组[字节],时间:时间)=&GT; {//作品,是啊!
  的println(rdd.count)
  的println(RDD的isEmpty:+ rdd.isEmpty)
  }
)ssc.start()
ssc.awaitTermination()

I have been trying to get Spark read events from Kinesis recently but am having problem in receiving the events. While Spark is able to connect to Kinesis and is able to get metadata from Kinesis, Its not able to get events from it. It always fetches zero elements back.

There are no errors, just empty results back. Spark is able to get metadata (Eg. number of shards in kinesis etc).

I have used these [1 & 2] guides for getting it working but have not got much luck yet. I have also tried couple of suggestions from SO [3]. The cluster has sufficient resources/cores available.

We have seen a version conflict in Protobuf Version between Spark and Kinesis which could also be a cause for this behavior. Spark uses protobuf-java version 2.5.0 and kinesis probably uses protobuf-java-2.6.1.jar.

Just wondered if anyone has come across this behavior or, has got spark working with kinesis.

Have tried with Spark 1.5.0, Spark 1.6.0.

  1. http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
  2. https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

  3. Apache Spark Kinesis Sample not working

解决方案

Answering my own Question -

I have got some success with Spark Kinesis integration, and the key being the unionStreams.foreachRDD.

There are 2 versions of the foreachRDD available

  • unionStreams.foreachRDD
  • unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time)

For some reason the first one is not able to get me the results but changing to the second one fetches me the results as expected. Yet to explore the reason.

Adding a code snippet below for reference.

Also consider changing this. This helped me as well-

"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1",  // Works

Hope it helps someone :)

Thanks everyone for help.

val kinesisStreams = (0 until numStreams).map {
  count =>
    val stream = KinesisUtils.createStream(
      ssc,
      consumerName,
      streamName,
      endpointUrl,
      regionName,
      InitialPositionInStream.TRIM_HORIZON,
      kinesisCheckpointInterval,
      StorageLevel.MEMORY_AND_DISK_2
    )

    stream
}
val unionStreams = ssc.union(kinesisStreams)

println(s"========================")
println(s"Num of streams: ${numStreams}")
println(s"========================")

/*unionStreams.foreachRDD{ // Doesn't Work !!
  rdd =>
    println(rdd.count)
    println("rdd isempty:" + rdd.isEmpty)
}*/ 
unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !!
  println(rdd.count)
  println("rdd isempty:" + rdd.isEmpty)
  }
)

ssc.start()
ssc.awaitTermination()

这篇关于星火无法获取从亚马逊室壁运动赛事的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆