使用Play框架的Akka群集设置 [英] Akka cluster setup with play framework

查看:91
本文介绍了使用Play框架的Akka群集设置的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在尝试通过自动发现服务来实现集群播放+ akka实现.但是,我似乎在玩游戏中附带的Guice DI加载器时遇到了问题.他们的文档摘录指出:

I'm currently trying to implement a clustered play + akka implementation with an auto discovery service. However, I seem to be running into issues with the Guice DI loader that's included with play. The excerpt from their documentation states:

https://www.playframework.com/documentation/2.5.x/ScalaAkka#与Akka集成

虽然我们建议您使用内置的actor系统,因为它会设置所有内容,例如正确的类加载器,生命周期挂钩等,但没有什么可以阻止您使用自己的actor系统.但是重要的是要确保您执行以下操作:

While we recommend you use the built in actor system, as it sets up everything such as the correct classloader, lifecycle hooks, etc, there is nothing stopping you from using your own actor system. It is important however to ensure you do the following:

注册一个停止钩,以在Play关闭时关闭actor系统 从Play环境中传入正确的类加载器,否则Akka将无法找到您的应用程序类

Register a stop hook to shut the actor system down when Play shuts down Pass in the correct classloader from the Play Environment otherwise Akka won’t be able to find your applications classes

请确保您通过使用play.akka.config更改Play读取其akka配置的位置,或者确保您不从默认akka配置中读取akka配置,因为这会导致问题,例如当系统尝试绑定到相同的远程端口

Ensure that either you change the location that Play reads it’s akka configuration from using play.akka.config, or that you don’t read your akka configuration from the default akka config, as this will cause problems such as when the systems try to bind to the same remote ports

我已经完成了他们推荐的上述配置,但是我似乎仍然无法通过BuiltInModule绑定它的内部ActorSystemProvider来解决问题:

I have done the above configuration that they recommend however I can't seem to get around play still binding it's internal ActorSystemProvider from the BuiltInModule:

class BuiltinModule extends Module {
def bindings(env: Environment, configuration: Configuration): Seq[Binding[_]] = 

    {
        def dynamicBindings(factories: ((Environment, Configuration) => Seq[Binding[_]])*) = {
          factories.flatMap(_(env, configuration))
        }

        Seq(
          bind[Environment] to env,
          bind[ConfigurationProvider].to(new ConfigurationProvider(configuration)),
          bind[Configuration].toProvider[ConfigurationProvider],
          bind[HttpConfiguration].toProvider[HttpConfiguration.HttpConfigurationProvider],

          // Application lifecycle, bound both to the interface, and its implementation, so that Application can access it
          // to shut it down.
          bind[DefaultApplicationLifecycle].toSelf,
          bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]),

          bind[Application].to[DefaultApplication],
          bind[play.Application].to[play.DefaultApplication],

          bind[Router].toProvider[RoutesProvider],
          bind[play.routing.Router].to[JavaRouterAdapter],
          bind[ActorSystem].toProvider[ActorSystemProvider],
          bind[Materializer].toProvider[MaterializerProvider],
          bind[ExecutionContextExecutor].toProvider[ExecutionContextProvider],
          bind[ExecutionContext].to[ExecutionContextExecutor],
          bind[Executor].to[ExecutionContextExecutor],
          bind[HttpExecutionContext].toSelf,

          bind[CryptoConfig].toProvider[CryptoConfigParser],
          bind[CookieSigner].toProvider[CookieSignerProvider],
          bind[CSRFTokenSigner].toProvider[CSRFTokenSignerProvider],
          bind[AESCrypter].toProvider[AESCrypterProvider],
          bind[play.api.libs.Crypto].toSelf,
          bind[TemporaryFileCreator].to[DefaultTemporaryFileCreator]
        ) ++ dynamicBindings(
            HttpErrorHandler.bindingsFromConfiguration,
            HttpFilters.bindingsFromConfiguration,
            HttpRequestHandler.bindingsFromConfiguration,
            ActionCreator.bindingsFromConfiguration
          )
      }
    }

我试图创建自己的GuiceApplicationBuilder来绕过它,但是现在它只是将重复的绑定异常移到了BuiltInModule中.

I have tried creating my own GuiceApplicationBuilder in order to bypass this however, now it just moves the duplicate binding exception to come from the BuiltInModule instead.

这就是我要尝试的:

AkkaConfigModule:

AkkaConfigModule:

package module.akka

import com.google.inject.{AbstractModule, Inject, Provider, Singleton}
import com.typesafe.config.Config
import module.akka.AkkaConfigModule.AkkaConfigProvider
import net.codingwell.scalaguice.ScalaModule
import play.api.Application

/**
  * Created by dmcquill on 8/15/16.
  */
object AkkaConfigModule {
    @Singleton
    class AkkaConfigProvider @Inject() (application: Application) extends Provider[Config] {
        override def get() = {
            val classLoader = application.classloader
            NodeConfigurator.loadConfig(classLoader)
        }
    }
}

/**
  * Binds the application configuration to the [[Config]] interface.
  *
  * The config is bound as an eager singleton so that errors in the config are detected
  * as early as possible.
  */
class AkkaConfigModule extends AbstractModule with ScalaModule {

    override def configure() {
        bind[Config].toProvider[AkkaConfigProvider].asEagerSingleton()
    }

}

ActorSystemModule:

ActorSystemModule:

package module.akka


import actor.cluster.ClusterMonitor
import akka.actor.ActorSystem
import com.google.inject._
import com.typesafe.config.Config
import net.codingwell.scalaguice.ScalaModule
import play.api.inject.ApplicationLifecycle

import scala.collection.JavaConversions._

/**
  * Created by dmcquill on 7/27/16.
  */
object ActorSystemModule {
    @Singleton
    class ActorSystemProvider @Inject() (val lifecycle: ApplicationLifecycle, val config: Config, val injector: Injector) extends Provider[ActorSystem] {
        override def get() = {
            val system = ActorSystem(config.getString(NodeConfigurator.CLUSTER_NAME_PROP), config.getConfig("fitnessApp"))

            // add the GuiceAkkaExtension to the system, and initialize it with the Guice injector
            GuiceAkkaExtension(system).initialize(injector)

            system.log.info("Configured seed nodes: " + config.getStringList("fitnessApp.akka.cluster.seed-nodes").mkString(", "))
            system.actorOf(GuiceAkkaExtension(system).props(ClusterMonitor.name))

            lifecycle.addStopHook { () =>
                system.terminate()
            }

            system
        }
    }
}

/**
  * A module providing an Akka ActorSystem.
  */
class ActorSystemModule extends AbstractModule with ScalaModule {
    import module.akka.ActorSystemModule.ActorSystemProvider

    override def configure() {
        bind[ActorSystem].toProvider[ActorSystemProvider].asEagerSingleton()
    }
}

应用程序加载器:

class CustomApplicationLoader extends GuiceApplicationLoader {

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
        initialBuilder
            .overrides(overrides(context): _*)
            .bindings(new AkkaConfigModule, new ActorSystemModule)
    }

}

我需要完成的主要工作是配置ActorSystem,以便可以以编程方式加载Akka集群的种子节点.

The main thing I need to accomplish is to configure the ActorSystem so that I can load the seed nodes of the Akka cluster programmatically.

以上方法是正确的方法还是有更好的方法来实现?如果这是正确的方法,那么从根本上我对于游戏/向导的DI设置根本不了解吗?

Is the above approach the right approach or is there a better way to accomplish this? If this is the right approach is there something I'm fundamentally not understanding with the DI setup for play/guice?

更新

对于这种体系结构,play + akka位于同一节点上.

For this architecture, play+akka are located on the same node.

推荐答案

最后,我最终尝试做一些比必要的复杂的事情.代替执行上述流程,我只是以编程方式扩展了初始配置,以便可以以编程方式检索必要的网络信息.

In the end I ended up trying to do something a bit more complicated than necessary. Instead of doing the above flow I simply extended the initial configuration programmatically so that I could retrieve the necessary networking information programmatically.

最终结果基本上由几个类组成:

The end result essentially consisted of a few classes:

NodeConfigurator :此类包含用于从application.conf中检索属性,然后以编程方式创建要与kubernetes发现服务结合使用的配置的相关实用工具方法.

NodeConfigurator: This class contains relevant utility methods used to retrieve the properties from the application.conf and then create a config programmatically to be used in conjunction with a kubernetes discovery service.

object NodeConfigurator {

    /**
      * This method given a class loader will return the configuration object for an ActorSystem
      * in a clustered environment
      *
      * @param classLoader the configured classloader of the application
      * @return Config
      */
    def loadConfig(classLoader: ClassLoader) = {
        val config = ConfigFactory.load(classLoader)

        val clusterName = config.getString(CLUSTER_NAME_PROP)
        val seedPort = config.getString(SEED_PORT_CONF_PROP)

        val host = if (config.getString(HOST_CONF_PROP) equals "eth0-address-or-localhost") {
            getLocalHostAddress.getOrElse(DEFAULT_HOST_ADDRESS)
        } else {
            config.getString(HOST_CONF_PROP)
        }

        ConfigFactory.parseString(formatSeedNodesConfig(clusterName, getSeedNodes(config), seedPort, host))
            .withValue(HOST_CONF_PROP, ConfigValueFactory.fromAnyRef(host))
            .withValue("fitnessApp.akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(host))
            .withFallback(config)
            .resolve()
    }

    /**
      * Get the local ip address which defaults to localhost if not
      * found on the eth0 adapter
      *
      * @return Option[String]
      */
    def getLocalHostAddress:  Option[String] = {
        import java.net.NetworkInterface

        import scala.collection.JavaConversions._

        NetworkInterface.getNetworkInterfaces
            .find(_.getName equals "eth0")
            .flatMap { interface =>
                interface.getInetAddresses.find(_.isSiteLocalAddress).map(_.getHostAddress)
            }
    }

    /**
      * Retrieves a set of seed nodes that are currently running in our cluster
      *
      * @param config akka configuration object
      * @return Array[String]
      */
    def getSeedNodes(config: Config) = {
        if(config.hasPath(SEED_NODES_CONF_PROP)) {
            config.getString(SEED_NODES_CONF_PROP).split(",").map(_.trim)
        } else {
            Array.empty[String]
        }
    }

    /**
      * formats the seed node addresses in the proper format
      *
      * @param clusterName name of akka cluster
      * @param seedNodeAddresses listing of current seed nodes
      * @param seedNodePort configured seed node port
      * @param defaultSeedNodeAddress default seed node address
      * @return
      */
    def formatSeedNodesConfig(clusterName: String, seedNodeAddresses: Array[String], seedNodePort: String, defaultSeedNodeAddress: String) = {
        if(seedNodeAddresses.isEmpty) {
            s"""fitnessApp.akka.cluster.seed-nodes = [ "akka.tcp://$clusterName@$defaultSeedNodeAddress:$seedNodePort" ]"""
        } else {
            seedNodeAddresses.map { address =>
                s"""fitnessApp.akka.cluster.seed-nodes += "akka.tcp://$clusterName@$address:$seedNodePort""""
            }.mkString("\n")
        }
    }

    val CLUSTER_NAME_PROP = "fitnessAkka.cluster-name"
    val HOST_CONF_PROP = "fitnessAkka.host"
    val PORT_CONF_PROP = "fitnessAkka.port"
    val SEED_NODES_CONF_PROP = "fitnessAkka.seed-nodes"
    val SEED_PORT_CONF_PROP = "fitnessAkka.seed-port"

    private val DEFAULT_HOST_ADDRESS = "127.0.0.1"
}

CustomApplicationLoader :只需使用可覆盖的应用程序加载器从NodeConfigurator接收生成的配置,然后使用它扩展initialConfiguration.

CustomApplicationLoader: Simply uses the overridable application loader of play to take in the produced config from from NodeConfigurator and then extend the initialConfiguration with it.

class CustomApplicationLoader extends GuiceApplicationLoader {

    override def builder(context: ApplicationLoader.Context): GuiceApplicationBuilder = {
        val classLoader = context.environment.classLoader
        val configuration = Configuration(NodeConfigurator.loadConfig(classLoader))

        initialBuilder
                .in(context.environment)
                .loadConfig(context.initialConfiguration ++ configuration)
                .overrides(overrides(context): _*)
    }

}

AkkaActorModule :提供可依赖注入的actor引用,以与API一起使用以显示集群成员.

AkkaActorModule: Provides a dependency injectable actor ref for use with an API to display the cluster members.

class AkkaActorModule extends AbstractModule with AkkaGuiceSupport {
    def configure = {
        bindActor[ClusterMonitor]("cluster-monitor")
    }
}

ClusterMonitor :这是一个参与者,它只是在侦听集群事件并另外接收消息以产生当前集群状态.

ClusterMonitor: This is an actor that is simply listening to cluster events and additionally receives messages to produce the current cluster state.

class ClusterMonitor @Inject() extends Actor with ActorLogging {
    import actor.cluster.ClusterMonitor.GetClusterState

    val cluster = Cluster(context.system)
    private var nodes = Set.empty[Address]

    override def preStart(): Unit = {
        cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
    }

    override def postStop(): Unit = cluster.unsubscribe(self)

    override def receive = {
        case MemberUp(member) => {
            nodes += member.address
            log.info(s"Cluster member up: ${member.address}")
        }
        case UnreachableMember(member) => log.warning(s"Cluster member unreachable: ${member.address}")
        case MemberRemoved(member, previousStatus) => {
            nodes -= member.address
            log.info(s"Cluster member removed: ${member.address}")
        }
        case MemberExited(member) => log.info(s"Cluster member exited: ${member.address}")
        case GetClusterState => sender() ! nodes
        case _: MemberEvent =>
    }

}

object ClusterMonitor {
    case class GetClusterState()
}

应用程序:只需一个测试控制器即可输出已加入集群的节点列表

Application: Simply a test controller to output a list of nodes that have joined the cluster

class Application @Inject() (@Named("cluster-monitor") clusterMonitorRef: ActorRef) extends Controller {

    implicit val addressWrites = new Writes[Address] {
        def writes(address: Address) = Json.obj(
            "host" -> address.host,
            "port" -> address.port,
            "protocol" -> address.protocol,
            "system" -> address.system
        )
    }

    implicit val timeout = Timeout(5, TimeUnit.SECONDS)

    def listClusterNodes = Action.async {
        (clusterMonitorRef ? GetClusterState).mapTo[Set[Address]].map { addresses =>
            Ok(Json.toJson(addresses))
        }
    }

}

上述控制器的结果产生类似于以下内容的输出:

The result of the above controller produces output similar to the below:

$ http GET 192.168.99.100:30760/cluster/nodes

HTTP/1.1 200 OK
Content-Length: 235
Content-Type: application/json
Date: Thu, 18 Aug 2016 02:50:30 GMT

[
    {
        "host": "172.17.0.3", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.4", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }, 
    {
        "host": "172.17.0.5", 
        "port": 2551, 
        "protocol": "akka.tcp", 
        "system": "fitnessApp"
    }
]

这篇关于使用Play框架的Akka群集设置的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆