火花流MQTT [英] Spark Streaming MQTT

查看:108
本文介绍了火花流MQTT的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在使用spark来从kafka传输数据,这非常简单.

I've been using spark to stream data from kafka and it's pretty easy.

我认为使用MQTT实用程序也很容易,但这并不是出于某些原因.

I thought using the MQTT utils would also be easy, but it is not for some reason.

我正在尝试执行以下代码.

I'm trying to execute the following piece of code.

  val sparkConf = new SparkConf(true).setAppName("amqStream").setMaster("local")
  val ssc = new StreamingContext(sparkConf, Seconds(10))

  val actorSystem = ActorSystem()
  implicit val kafkaProducerActor = actorSystem.actorOf(Props[KafkaProducerActor])

  MQTTUtils.createStream(ssc, "tcp://localhost:1883", "AkkaTest")
    .foreachRDD { rdd =>
      println("got rdd: " + rdd.toString())
      rdd.foreach { msg =>
        println("got msg: " + msg)
      }
    }

  ssc.start()
  ssc.awaitTermination()

奇怪的是,spark记录了我在控制台中发送的msg,而不是我的println.

The weird thing is that spark logs the msg I sent in the console, but not my println.

它记录如下内容:

19:38:18.803 [RecurringTimer-BlockGenerator]调试o.a.s.s.receiver.BlockGenerator-中的最后一个元素输入0-1435790298600是某些消息

19:38:18.803 [RecurringTimer - BlockGenerator] DEBUG o.a.s.s.receiver.BlockGenerator - Last element in input-0-1435790298600 is SOME MESSAGE

推荐答案

foreach 是分布式操作,因此您的println可能正在工作程序上执行.如果要查看本地打印出的某些消息,可以在DStream上使用内置的 print 函数,或者代替您的 foreachRDD 收集(或获取)一些消息.元素返回到驱动程序并在那里打印.希望对Spark Streaming有所帮助并带来好运:)

foreach is a distributed action, so your println may be executing on the workers. If you want to see some of the messages printed out locally, you could use the built in print function on the DStream or instead of your foreachRDD collect (or take) some of the elements back to the driver and print them there. Hope that helps and best of luck with Spark Streaming :)

这篇关于火花流MQTT的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆