在Scalas中使用来自Rabbitmq的消息时清空RDD [英] Empty RDD while consuming messages from Rabbitmq in sparks scala

查看:44
本文介绍了在Scalas中使用来自Rabbitmq的消息时清空RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是消费RabbitMQ消息的电光流媒体代码。

import java.io.{ BufferedReader, InputStreamReader }
import java.net.Socket
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.receiver.Receiver


class CustomReceiver
    extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
var factory: ConnectionFactory = _
var consumer: QueueingConsumer = _
var rabbitMQconnection: Connection = null;
var channel: Channel = null
var host = "****"
var port = "****"
var queueName = "test"
var virtualHost = "host"
var userName = "name"
var password = "****"
  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null 
try {
      val QUEUE_NAME = "test"

   

      if (rabbitMQconnection == null) {
        rabbitMQconnection = getRabbitMQConnection
        channel = rabbitMQconnection.createChannel();

        println("rabbitMQ Connection created");
        if (rabbitMQconnection != null) {
          println("rabbitMQ Connection created")
        }
      }
     
      val delayArgs = Map[String, AnyRef]("x-message-ttl" -> Long.box(50000)) 
                                    
     channel.queueDeclare("test",true,false,false,delayArgs)
     consumer = new QueueingConsumer(channel);
           channel.basicConsume("test",true,consumer)
     val delivery = consumer.nextDelivery()
        val ty = unzip(delivery.getBody())
        println("at consumer : " +ty)
       store(ty) 
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        

      } catch {
      case e: java.net.ConnectException =>
        restart("Error connecting to ")
      case t: Throwable =>
        restart("Error receiving data", t)
}
  }
      def getRabbitMQConnection()={
    val factory = new ConnectionFactory()
    factory.setUsername("name")
    factory.setPassword("password")
    factory.setHost("****")
     factory.setVirtualHost("*****")
  factory.setAutomaticRecoveryEnabled(true)
  factory.setTopologyRecoveryEnabled(true)
  factory.newConnection()
 
}
  def unzip(x: Array[Byte]) : String = {      
    val inputStream = new GZIPInputStream(new ByteArrayInputStream(x))
    val output = scala.io.Source.fromInputStream(inputStream).mkString
    return output    
    }
    }
    
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

    val batchInterval = Seconds(10)
    val ssc = new StreamingContext(sc, batchInterval)
    val stream = ssc.receiverStream(new CustomReceiver())
    stream.foreachRDD(rdd =>{ print(rdd)
    })
     ssc.start()
     ssc.awaitTermination()
 

如下所示输出为空。当我尝试使用Python pika库(不使用电光流上下文)消费消息时,我能够消费数据并将消息保存到存储中。

Time: 1642473060000 ms
-------------------------------------------

-------------------------------------------
Time: 1642473070000 ms
-------------------------------------------

有人能告诉我上面的代码有什么问题吗?

推荐答案

将x-message-ttl设置更改为所需的值帮助我解决了该问题。x-message-ttl值应与Rabbitmq队列中声明的值相同。

这篇关于在Scalas中使用来自Rabbitmq的消息时清空RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆