Socket连接和ActorSystem [英] Socket connection and ActorSystem

查看:280
本文介绍了Socket连接和ActorSystem的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个应用程序,使用akka,现在我想通过socket连接到它。因此,我使用类似于 scala页面的机器。
但是如果我尝试 tell ,而我有一个打开 OutputStream ,没有收到消息目标。



这是我的源代码:

  

def main(args:Array [String]){
val port = 1337
val conf = ConfigFactory.load
val system = ActorSystem(SDDB, conf.getConfig(SDDB))
val master = system.actorOf(Props [TestActor])
master! a

try {
val listener = new ServerSocket(port)
println(listen on port:+ port)
while(true)
new ConnectionThread(listener accept,master).start
listener close
} catch {
case e:IOException =>
System.err.println(Could not listen on port:+ port +。)
System.exit(-1)
} finally {
system.shutdown
}
}
}

案例类ConnectionThread(socket:Socket,master:ActorRef)
extends Thread(ConnectionThread){

private val Select_ * =select(\w +)from(\w +)on(\d {4}) - (\d\d) - (\d\d) - d)。r
private implicit var id = 0L
private implicit val timeout = Timeout(25.0 seconds)

master! b

override def run {
master! c
try {
master! d
val in = new ObjectInputStream(socket getInputStream)
master! e
val out = new ObjectOutputStream(socket getOutputStream)

out writeObject(listening)
out flush

master! f
val command = in.readObject.asInstanceOf [String]
println(client sent:'+ command +')
//处理命令

master! g
out.writeObject(EOF)
out.flush

out.close
in.close
socket.close
} catch {
case e:SocketException =>
case e:IOException => e printStackTrace
}
}
}

class TestActor extends带ActorLogging的Actor {

日志信息(TestActor running)

def receive = {
case s:String =>
日志信息(received:+ s)
}

}

我得到输出:

 侦听端口:1337 
[INFO] TestActor running
[INFO] received:a
[INFO] received:b
[INFO] received:c
[INFO] received:d



现在我希望它可以直到g,但是我得到:

 客户端发送:'从testdata中选择内容2012-07-06'

我发现它工作,直到我打开一个流的套接字,可能是因为告诉是基于套接字以及使用套接字的输出流,胎面运行。然后套接字连接工作,但我不能发送任何消息到actor系统。

我没有办法删除连接器和ConnectionThread。如何解决?

解决方案

我必须承认,我没有完全理解文档中的示例。但我想通过使用 ConnectionHelper 而不是直接解决 ActorRef 工作相当不错。

我将我的代码更改为以下内容:

  object连接器{

def main(args:Array [String]){
val port = 1337
val conf = ConfigFactory.load
val system = ActorSystem(SDDB,conf.getConfig(SDDB) )

// val master = system.actorOf(Props [TestActor],master)
// master! a

try {
val listener = new ServerSocket(port)
println(listen on port:+ port)
while(true)
// new ConnectionThread(listener accept,master.asInstanceOf [TestActor])。start
new ConnectionThread(listener accept,system).start
listener close
} catch {
case e:IOException =>
System.err.println(Could not listen on port:+ port +。)
System.exit(-1)
} finally {
// master ! PoisonPill
system.shutdown
}
}

}

案例类ConnectionThread(socket:Socket,sys:ActorSystem)$ b $ (\ w +)on(\d {4}) - (\ w +)上的(\w +)选择(\w +)在线程(ConnectionThread){

private val Select_ * = d \ d) - (\ d\d)。r
私人隐式var id = 0L
私人隐式val超时=超时(25.0秒)
私人val conHelper = new ConnectionHelper

覆盖def run {
try {
val out = new ObjectOutputStream(socket getOutputStream)
val in = new ObjectInputStream(socket getInputStream)

conHelper告诉funzt
out writeObject(Hi)
out.flush
val command = in.readObject.asInstanceOf [String]
println receive:+ command)
out writeObject(test)
out.flush
out writeObject(EOF)
out.flush

out.close
in.close
socket.close
}
}

私人类ConnectionHelper {
val tester = sys.actorOf Props [TestActor])

def tell(s:String){tester! s}

}

}



不真正理解为什么这个工作,代码从我的问题不。我欢迎所有的解释。


I have an application, that uses akka and now I want to connect to it via a socket connection. Therefor I use a machanism similar to the one from the scala page. But if I try to tell, while I have an open OutputStream, no message is received by the target.

Here is my source code:

object Connector {

  def main(args: Array[String]) {
    val port = 1337
    val conf = ConfigFactory.load
    val system = ActorSystem("SDDB", conf.getConfig("SDDB"))
    val master = system.actorOf(Props[TestActor])
    master ! "a"

    try {
      val listener = new ServerSocket(port)
      println("listening on port: " + port)
      while (true)
        new ConnectionThread(listener accept, master).start
      listener close
    } catch {
      case e: IOException =>
        System.err.println("Could not listen on port: " + port + ".")
        System.exit(-1)
    } finally {
      system.shutdown
    }
  }
}

case class ConnectionThread(socket: Socket, master: ActorRef) 
  extends Thread("ConnectionThread") {

  private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r
  private implicit var id = 0L
  private implicit val timeout = Timeout(25.0 seconds)

  master ! "b"

  override def run {
    master ! "c"
    try{
      master ! "d"
      val in = new ObjectInputStream(socket getInputStream)
      master ! "e"
      val out = new ObjectOutputStream(socket getOutputStream)

      out writeObject("listening")
      out flush

      master ! "f"
      val command = in.readObject.asInstanceOf[String]
      println("client sent: '" + command + "'")
      // process the command

      master ! "g"
      out.writeObject("EOF")
      out.flush

      out.close
      in.close
      socket.close
    } catch {
      case e: SocketException =>
      case e: IOException => e printStackTrace
    }
  }
}

class TestActor extends Actor with ActorLogging{

  log info("TestActor running")

  def receive = {
    case s: String =>
      log info("received: " + s)
  }

}

I get the output:

listening on port: 1337
[INFO] TestActor running
[INFO] received: a
[INFO] received: b
[INFO] received: c
[INFO] received: d

Now I expected it to go on until g, but instead I get:

client sent: 'select content from testdata on 2012-07-06'

I figured out that it works until I open a Stream of the socket, probably because tell and ask are socketbased as well and use the outputstream of the socket, the tread runs in. Afterwards the socket connection works, but I am not able to send any message to the actor-system.
There is no way for me to drop the Connector and the ConnectionThread. How can I fix it?

解决方案

I must admit, that I did not completly understood the example from the documentation. But I figured out that using a ConnectionHelper instead of directly addressing the ActorRef works pretty good.
I changed my code to the following:

object Connector {

  def main(args: Array[String]) {
    val port = 1337
    val conf = ConfigFactory.load
    val system = ActorSystem("SDDB", conf.getConfig("SDDB"))

    //    val master = system.actorOf(Props[TestActor], "master")
    //    master ! "a"

    try {
      val listener = new ServerSocket(port)
      println("listening on port: " + port)
      while (true)
      //        new ConnectionThread(listener accept, master.asInstanceOf[TestActor]).start
        new ConnectionThread(listener accept, system).start
      listener close
    } catch {
      case e: IOException =>
        System.err.println("Could not listen on port: " + port + ".")
        System.exit(-1)
    } finally {
      //      master ! PoisonPill
      system.shutdown
    }
  }

}

case class ConnectionThread(socket: Socket, sys: ActorSystem) 
  extends Thread("ConnectionThread") {

  private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r
  private implicit var id = 0L
  private implicit val timeout = Timeout(25.0 seconds)
  private val conHelper = new ConnectionHelper

  override def run {
    try {
      val out = new ObjectOutputStream(socket getOutputStream)
      val in = new ObjectInputStream(socket getInputStream)

      conHelper tell "funzt"
      out writeObject ("Hi")
      out.flush
      val command = in.readObject.asInstanceOf[String]
      println("received: " + command)
      out writeObject ("test")
      out.flush
      out writeObject ("EOF")
      out.flush

      out.close
      in.close
      socket.close
    }
  }

  private class ConnectionHelper {
    val tester = sys.actorOf(Props[TestActor])

    def tell(s: String) { tester ! s }

  }

}

I don't really understand why this works and the code from my question does not. I welcome all explanations.

这篇关于Socket连接和ActorSystem的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆