在Scalas中使用来自Rabbitmq的消息时清空RDD [英] Empty RDD while consuming messages from Rabbitmq in sparks scala
本文介绍了在Scalas中使用来自Rabbitmq的消息时清空RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
下面是消费RabbitMQ消息的电光流媒体代码。
import java.io.{ BufferedReader, InputStreamReader }
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.streaming.receiver.Receiver
class CustomReceiver
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
var factory: ConnectionFactory = _
var consumer: QueueingConsumer = _
var rabbitMQconnection: Connection = null;
var channel: Channel = null
var host = "****"
var port = "****"
var queueName = "test"
var virtualHost = "host"
var userName = "name"
var password = "****"
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
val QUEUE_NAME = "test"
if (rabbitMQconnection == null) {
rabbitMQconnection = getRabbitMQConnection
channel = rabbitMQconnection.createChannel();
println("rabbitMQ Connection created");
if (rabbitMQconnection != null) {
println("rabbitMQ Connection created")
}
}
val delayArgs = Map[String, AnyRef]("x-message-ttl" -> Long.box(50000))
channel.queueDeclare("test",true,false,false,delayArgs)
consumer = new QueueingConsumer(channel);
channel.basicConsume("test",true,consumer)
val delivery = consumer.nextDelivery()
val ty = unzip(delivery.getBody())
println("at consumer : " +ty)
store(ty)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to ")
case t: Throwable =>
restart("Error receiving data", t)
}
}
def getRabbitMQConnection()={
val factory = new ConnectionFactory()
factory.setUsername("name")
factory.setPassword("password")
factory.setHost("****")
factory.setVirtualHost("*****")
factory.setAutomaticRecoveryEnabled(true)
factory.setTopologyRecoveryEnabled(true)
factory.newConnection()
}
def unzip(x: Array[Byte]) : String = {
val inputStream = new GZIPInputStream(new ByteArrayInputStream(x))
val output = scala.io.Source.fromInputStream(inputStream).mkString
return output
}
}
import org.apache.spark._
import org.apache.spark.storage._
import org.apache.spark.streaming._
import org.apache.spark.streaming.receiver._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
val batchInterval = Seconds(10)
val ssc = new StreamingContext(sc, batchInterval)
val stream = ssc.receiverStream(new CustomReceiver())
stream.foreachRDD(rdd =>{ print(rdd)
})
ssc.start()
ssc.awaitTermination()
如下所示输出为空。当我尝试使用Python pika库(不使用电光流上下文)消费消息时,我能够消费数据并将消息保存到存储中。
Time: 1642473060000 ms
-------------------------------------------
-------------------------------------------
Time: 1642473070000 ms
-------------------------------------------
有人能告诉我上面的代码有什么问题吗?
推荐答案
将x-message-ttl设置更改为所需的值帮助我解决了该问题。x-message-ttl值应与Rabbitmq队列中声明的值相同。
这篇关于在Scalas中使用来自Rabbitmq的消息时清空RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文