消耗TCP流并将其重定向到另一个接收器(使用Akka流) [英] Consume TCP stream and redirect it to another Sink (with Akka Streams)

查看:101
本文介绍了消耗TCP流并将其重定向到另一个接收器(使用Akka流)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用Akka 2.4.3将TCP流重定向/转发到另一个接收器。
该程序应打开服务器套接字,侦听传入的连接,然后使用tcp流。我们的发件人不希望收到我们的答复,因此我们从不发回任何东西-我们只消耗流。在对tcp流进行成帧之后,我们需要将字节转换成更有用的内容并将其发送到接收器。

I try to redirect/forward a TCP stream to another Sink with Akka 2.4.3. The program should open a server socket, listen for incoming connections and then consume the tcp stream. Our sender does not expect/accept replies from us so we never send back anything - we just consume the stream. After framing the tcp stream we need to transform the bytes into something more useful and send it to the Sink.

到目前为止,我尝试了以下操作,但是我特别努力如何不将tcp数据包发送回发送方并正确连接接收器。

I tried the following so far but i struggle especially with the part how to not sending tcp packets back to the sender and to properly connect the Sink.

import scala.util.Failure
import scala.util.Success

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow

object TcpConsumeOnlyStreamToSink {
  implicit val system = ActorSystem("stream-system")
  private val log = Logging(system, getClass.getName)    

  //The Sink
  //In reality this is of course a real Sink doing some useful things :-)
  //The Sink accept types of "SomethingMySinkUnderstand"
  val mySink = Sink.ignore;

  def main(args: Array[String]): Unit = {
    //our sender is not interested in getting replies from us
    //so we just want to consume the tcp stream and never send back anything to the sender
    val (address, port) = ("127.0.0.1", 6000)
    server(system, address, port)
  }

  def server(system: ActorSystem, address: String, port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
      println("Client connected from: " + conn.remoteAddress)

      conn handleWith Flow[ByteString]
      //this is neccessary since we use a self developed tcp wire protocol
      .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
      //here we want to map the raw bytes into something our Sink understands
      .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
      //here we like to connect our Sink to the Tcp Source
      .to(mySink) //<------ NOT COMPILING
    }


    val tcpSource = Tcp().bind(address, port)
    val binding = tcpSource.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }

  }

  class SomethingMySinkUnderstand(x:String) {

  }
}

更新:将其添加到build.sbt文件中以获得必要的Deps

Update: Add this to your build.sbt file to get necessary deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"


推荐答案

handleWith 期望 Flow ,即一个具有未连接的入口和未连接的出口的盒子。您有效地提供了来源,因为您已将 Flow Sink 通过使用操作。

handleWith expects a Flow, i.e. a box with an unconnected inlet and an unconnected outlet. You effectively provide a Source, because you connected the Flow with a Sink by using the to operation.

我认为您可以执行以下操作:

I think you could do the following:

conn.handleWith(
  Flow[ByteString]
    .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
    .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
    .alsoTo(mySink)
    .map(_ => ByteString.empty)
    .filter(_ => false) // Prevents sending anything back
)

这篇关于消耗TCP流并将其重定向到另一个接收器(使用Akka流)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆