如何将 akka 流 kafka(reactive-kafka)集成到 akka http 应用程序中? [英] How to integrate akka streams kafka (reactive-kafka) into akka http application?

查看:29
本文介绍了如何将 akka 流 kafka(reactive-kafka)集成到 akka http 应用程序中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基本的 scala akka http CRUD 应用程序.相关课程见下文.

I have a basic scala akka http CRUD application. See below for the relevant classes.

我只想在例如创建/更新实体时将实体 ID 和一些数据(作为 json)写入 Kafka 主题.

I'd simply like to write an entity id and some data (as json) to a Kafka topic whenever, for example, an entity is created/updated.

我正在查看 http://doc.akka.io/docs/akka-stream-kafka/current/producer.html,但我是 scala 和 akka 的新手,不确定如何将它集成到我的应用程序中?

I'm looking at http://doc.akka.io/docs/akka-stream-kafka/current/producer.html, but am new to scala and akka, and unsure of how to integrate it into my application?

例如,从上面的文档来看,这是一个生产者写入 kafka 的例子,所以我想我需要类似的东西,但是我的应用程序应该在哪里?创建用户后,我可以在我的服务的 create 方法中添加另一个地图调用吗?

For example, from the docs above, this is the example of a producer writing to kafka, so I think I need to something similar, but whereabouts in my application should this go? Can I just add another map call in the create method in my service after I have created the user?

非常感谢!

val done = Source(1 to 100)
  .map(_.toString)
  .map { elem =>
    new ProducerRecord[Array[Byte], String]("topic1", elem)
  }
  .runWith(Producer.plainSink(producerSettings))

或者我需要做一些类似于这里的例子 https://github.com/hseeberger/accessus 在我的 Server.scala 的 bindAndHandle() 方法中?

Or do I need to do something like the example here https://github.com/hseeberger/accessus in the bindAndHandle() method in my Server.scala?

WebServer.scala

WebServer.scala

object System {

  implicit val system = ActorSystem()
  implicit val dispatcher = system.dispatcher
  implicit val actorMaterializer = ActorMaterializer()

}

object WebServer extends App {

  import System._

  val config = new ApplicationConfig() with ConfigLoader
  ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig))
  val injector = Guice.createInjector(new MyAppModule(config))
  val routes = injector.getInstance(classOf[Router]).routes
  Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port)

}

Router.scala

Router.scala

def routes(): Route = {
    post {
      entity(as[User]) { user =>
        val createUser = userService.create(user)
        onSuccess(createUser) {
          case Invalid(y: NonEmptyList[Err]) =>  {
            throw new ValidationException(y)
          }
          case Valid(u: User) => {
              complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    } ~
    // More routes here, left out for example  
}

Service.scala

Service.scala

def create(user: User): Future[MaybeValid[User]] = {
    for {
      validating <- userValidation.validateCreate(user)
      result <- validating match {
        case Valid(x: User) =>
          userRepo.create(x)
            .map(dbUser => Valid(UserConverters.fromUserRow(x)))
        case y: DefInvalid =>
          Future{y}
      }
    } yield result
  }

repo.scala

def create(user: User): Future[User] = {
    mutateDbProvider.db.run(
      userTable returning userTable.map(_.userId)
        into ((user, id) => user.copy(userId = id)) +=
        user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now())))
    )
  }

推荐答案

自从您编写了 Route 以从 EntityUsercode> 我认为你不需要 Producer.plainSink.相反,我认为 Producer.send 也能正常工作.另外,作为旁注,抛出异常不是惯用的"scala.所以我改变了无效用户的逻辑:

Since you have written your Route to unmarshall just 1 User from the Entity I don't think you need Producer.plainSink. Rather, I think Producer.send will work just as well. Also, as a side note, throwing exceptions is not "idiomatic" scala. So I changed the logic for invalid user:

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1",s"""{"userId" : ${u.userId}, "entity" : "User"}""")

          onComplete(producer send producerRecord) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

这篇关于如何将 akka 流 kafka(reactive-kafka)集成到 akka http 应用程序中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆