Spark Scala UDP 在监听端口上接收 [英] Spark Scala UDP receive on listening port

查看:17
本文介绍了Spark Scala UDP 在监听端口上接收的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

中提到的例子http://spark.apache.org/docs/latest/streaming-编程指南.html让我在 TCP 流中接收数据包并监听 端口 9999

The example mentioned in http://spark.apache.org/docs/latest/streaming-programming-guide.html Lets me receive data packets in a TCP stream and listening on port 9999

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))


 // Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

通过在我的 Linux 系统中使用创建数据服务器,我能够通过 TCP 发送数据$ nc -lk 9999

I am able to send data over TCP by creating a data server by using in my Linux system $ nc -lk 9999

问题
我需要使用 UDP 和 Scala/Spark
接收来自 android 手机流式传输的流val lines = ssc.socketTextStream("localhost", 9999)
仅在 TCP 流中接收.

Question
I need to receive stream from an android phone streaming using UDP and the Scala/Spark
val lines = ssc.socketTextStream("localhost", 9999)
receives ONLY in TCP streams.

如何使用 Scala+Spark 以类似的简单方式接收 UDP 流并创建 Spark DStream.

How can I receive UDP streams in a similar simple manner using Scala+Spark and create Spark DStream.

推荐答案

没有内置的东西,但自己完成它并没有太多的工作.这是我基于自定义 UdpSocketInputDStream[T] 制作的简单解决方案:

There isn't something built in, but it's not too much work to get it done youself. Here is a simple solution I made based on a custom UdpSocketInputDStream[T]:

import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

import scala.reflect.ClassTag
import scala.util.control.NonFatal

class UdpSocketInputDStream[T: ClassTag](
                                          _ssc: StreamingContext,
                                          host: String,
                                          port: Int,
                                          bytesToObjects: InputStream => Iterator[T],
                                          storageLevel: StorageLevel
                                        ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

class UdpSocketReceiver[T: ClassTag](host: String,
                                     port: Int,
                                     bytesToObjects: InputStream => Iterator[T],
                                     storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

  var udpSocket: DatagramSocket = _

  override def onStart(): Unit = {

    try {
      udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $port", e)
        return
    }

    // Start the thread that receives data over a connection
    new Thread("Udp Socket Receiver") {
      setDaemon(true)

      override def run() {
        receive()
      }
    }.start()
  }

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val buffer = new Array[Byte](2048)

      // Create a packet to receive data into the buffer
      val packet = new DatagramPacket(buffer, buffer.length)

      udpSocket.receive(packet)

      val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
      // Now loop forever, waiting to receive packets and printing them.
      while (!isStopped() && iterator.hasNext) {
        store(iterator.next())
      }

      if (!isStopped()) {
        restart("Udp socket data stream had no more data")
      }
    } catch {
      case NonFatal(e) =>
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }

  override def onStop(): Unit = {
    synchronized {
      if (udpSocket != null) {
        udpSocket.close()
        udpSocket = null
      }
    }
  }
}

为了让 StreamingContext 在自身上添加一个方法,我们用一个隐式类来丰富它:

In order to get StreamingContext to add a method on itself, we enrich it with an implicit class:

object Implicits {
  implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
    def udpSocketStream[T: ClassTag](host: String,
                                     port: Int,
                                     converter: InputStream => Iterator[T],
                                     storageLevel: StorageLevel): InputDStream[T] = {
      new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)
    }
  }
}

这就是你如何称呼它:

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.reflect.ClassTag

object TestRunner {
  import Implicits._

  def main(args: Array[String]): Unit = {
    val sparkContext = new SparkContext("local[*]", "udpTest")
    val ssc = new StreamingContext(sparkContext, Seconds(4))

    val stream = ssc.udpSocketStream("localhost", 
                                     3003, 
                                     bytesToLines, 
                                     StorageLevel.MEMORY_AND_DISK_SER_2)
    stream.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def bytesToLines(inputStream: InputStream): Iterator[String] = {
    val dataInputStream = new BufferedReader(
      new InputStreamReader(inputStream, StandardCharsets.UTF_8))
    new NextIterator[String] {
      protected override def getNext(): String = {
        val nextValue = dataInputStream.readLine()
        if (nextValue == null) {
          finished = true
        }
        nextValue
      }

      protected override def close() {
        dataInputStream.close()
      }
    }
  }

  abstract class NextIterator[U] extends Iterator[U] {
    protected var finished = false
    private var gotNext = false
    private var nextValue: U = _
    private var closed = false

    override def next(): U = {
      if (!hasNext) {
        throw new NoSuchElementException("End of stream")
      }
      gotNext = false
      nextValue
    }

    override def hasNext: Boolean = {
      if (!finished) {
        if (!gotNext) {
          nextValue = getNext()
          if (finished) {
            closeIfNeeded()
          }
          gotNext = true
        }
      }
      !finished
    }

    def closeIfNeeded() {
      if (!closed) {
        closed = true
        close()
      }
    }

    protected def getNext(): U
    protected def close()
  }
}

这段代码大部分取自Spark提供的SocketInputDStream[T],我只是重用了它.我还获取了 bytesToLines 使用的 NextIterator 的代码,它所做的只是消耗数据包中的行并将其转换为 String.如果你有更复杂的逻辑,你可以通过传递 converter: InputStream => 来提供它.Iterator[T] 你自己的实现.

Most of this code is taken from the SocketInputDStream[T] provided by Spark, I simply re-used it. I also took the code for the NextIterator which is used by bytesToLines, all it does is consume the line from the packet and transform it to a String. If you have more complex logic, you can provide it by passing converter: InputStream => Iterator[T] your own implementation.

用简单的 UDP 数据包测试它:

Testing it with simple UDP packet:

echo -n "hello hello hello!" >/dev/udp/localhost/3003

产量:

-------------------------------------------
Time: 1482676728000 ms
-------------------------------------------
hello hello hello!

当然,这还需要进一步测试.我还有一个隐藏的假设,即从 DatagramPacket 创建的每个 buffer 都是 2048 字节,这可能是您想要更改的内容.

Of course, this has to be further tested. I also has a hidden assumption that each buffer created from the DatagramPacket is 2048 bytes, which is perhaps something you'll want to change.

这篇关于Spark Scala UDP 在监听端口上接收的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆