使用 TCP 流并将其重定向到另一个 Sink(使用 Akka Streams) [英] Consume TCP stream and redirect it to another Sink (with Akka Streams)

查看:33
本文介绍了使用 TCP 流并将其重定向到另一个 Sink(使用 Akka Streams)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用 Akka 2.4.3 将 TCP 流重定向/转发到另一个接收器.该程序应该打开一个服务器套接字,侦听传入的连接,然后使用 tcp 流.我们的发件人不期望/接受我们的回复,所以我们从不发回任何东西——我们只是消费流.在对 tcp 流进行成帧后,我们需要将字节转换为更有用的内容并将其发送到 Sink.

到目前为止,我尝试了以下操作,但我尤其在如何不将 tcp 数据包发送回发件人并正确连接接收器的部分上挣扎.

import scala.util.Failure导入 scala.util.Success导入 akka.actor.ActorSystem导入 akka.event.Logging导入 akka.stream.ActorMaterializer导入 akka.stream.scaladsl.Sink导入 akka.stream.scaladsl.Tcp导入 akka.stream.scaladsl.Framing导入 akka.util.ByteString导入 java.nio.ByteOrder导入 akka.stream.scaladsl.Flow对象 TcpConsumeOnlyStreamToSink {隐式 val 系统 = ActorSystem("stream-system")私有 val 日志 = 日志记录(系统,getClass.getName)//水槽//实际上这当然是一个真正的 Sink 做一些有用的事情:-)//接收器接受SomethingMySinkUnderstand"类型val mySink = Sink.ignore;def main(args: Array[String]): Unit = {//我们的发件人对我们的回复不感兴趣//所以我们只想使用tcp流并且从不向发送者发回任何东西val(地址,端口)=(127.0.0.1",6000)服务器(系统,地址,端口)}def server(system: ActorSystem, address: String, port: Int): Unit = {隐式 val sys = 系统导入 system.dispatcher隐式 val materializer = ActorMaterializer()val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>println("客户端连接自:" + conn.remoteAddress)conn handleWith Flow[ByteString]//这是必要的,因为我们使用了自己开发的tcp线路协议.via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN))//这里我们想将原始字节映射到我们的 Sink 理解的东西中.map(msg => new SomethingMySinkUnderstand(msg.utf8String))//这里我们想将我们的 Sink 连接到 Tcp Source.to(mySink)//<------ 不编译}val tcpSource = Tcp().bind(地址,端口)val 绑定 = tcpSource.to(handler).run()绑定.onComplete {案例成功(b) =>println("服务器启动,监听:" + b.localAddress)case 失败(e) =>println(s"服务器无法绑定到 $address:$port: ${e.getMessage}")system.terminate()}}类SomethingMySinkUnderstand(x:字符串){}}

更新:将此添加到您的 build.sbt 文件以获得必要的 deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"

解决方案

handleWith 需要一个 Flow,即一个带有未连接入口和未连接出口的盒子.你有效地提供了一个 Source,因为你使用 to 操作将 Flow 与一个 Sink 连接起来.>

我认为您可以执行以下操作:

conn.handleWith(流[字节串].via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)).map(msg => new SomethingMySinkUnderstand(msg.utf8String)).alsoTo(mySink).map(_ => ByteString.empty).filter(_ => false)//防止发回任何东西)

I try to redirect/forward a TCP stream to another Sink with Akka 2.4.3. The program should open a server socket, listen for incoming connections and then consume the tcp stream. Our sender does not expect/accept replies from us so we never send back anything - we just consume the stream. After framing the tcp stream we need to transform the bytes into something more useful and send it to the Sink.

I tried the following so far but i struggle especially with the part how to not sending tcp packets back to the sender and to properly connect the Sink.

import scala.util.Failure
import scala.util.Success

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow

object TcpConsumeOnlyStreamToSink {
  implicit val system = ActorSystem("stream-system")
  private val log = Logging(system, getClass.getName)    

  //The Sink
  //In reality this is of course a real Sink doing some useful things :-)
  //The Sink accept types of "SomethingMySinkUnderstand"
  val mySink = Sink.ignore;

  def main(args: Array[String]): Unit = {
    //our sender is not interested in getting replies from us
    //so we just want to consume the tcp stream and never send back anything to the sender
    val (address, port) = ("127.0.0.1", 6000)
    server(system, address, port)
  }

  def server(system: ActorSystem, address: String, port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
      println("Client connected from: " + conn.remoteAddress)

      conn handleWith Flow[ByteString]
      //this is neccessary since we use a self developed tcp wire protocol
      .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
      //here we want to map the raw bytes into something our Sink understands
      .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
      //here we like to connect our Sink to the Tcp Source
      .to(mySink) //<------ NOT COMPILING
    }


    val tcpSource = Tcp().bind(address, port)
    val binding = tcpSource.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }

  }

  class SomethingMySinkUnderstand(x:String) {

  }
}

Update: Add this to your build.sbt file to get necessary deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"

解决方案

handleWith expects a Flow, i.e. a box with an unconnected inlet and an unconnected outlet. You effectively provide a Source, because you connected the Flow with a Sink by using the to operation.

I think you could do the following:

conn.handleWith(
  Flow[ByteString]
    .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
    .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
    .alsoTo(mySink)
    .map(_ => ByteString.empty)
    .filter(_ => false) // Prevents sending anything back
)

这篇关于使用 TCP 流并将其重定向到另一个 Sink(使用 Akka Streams)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆