如何将Akka Streams Kafka(reactive-kafka)集成到Akka HTTP应用程序中? [英] How to integrate akka streams kafka (reactive-kafka) into akka http application?

查看:186
本文介绍了如何将Akka Streams Kafka(reactive-kafka)集成到Akka HTTP应用程序中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基本的scala akka http CRUD应用程序。

I have a basic scala akka http CRUD application. See below for the relevant classes.

我只想在任何时候(例如,实体)将​​实体ID和一些数据(如json)写入Kafka主题。创建/更新。

I'd simply like to write an entity id and some data (as json) to a Kafka topic whenever, for example, an entity is created/updated.

我正在查看 http://doc.akka.io/docs/akka-stream-kafka/current/producer.html ,但对于scala和akka来说是新手,并且不确定如何将其集成到我的应用程序中?

I'm looking at http://doc.akka.io/docs/akka-stream-kafka/current/producer.html, but am new to scala and akka, and unsure of how to integrate it into my application?

例如,从上面的文档中可以看出,这是生产者写给kafka的示例,所以我认为我需要类似的东西,但是我的应用程序中的下落应该是走?创建用户后,能否在服务的create方法中添加另一个地图调用?

For example, from the docs above, this is the example of a producer writing to kafka, so I think I need to something similar, but whereabouts in my application should this go? Can I just add another map call in the create method in my service after I have created the user?

非常感谢!

val done = Source(1 to 100)
  .map(_.toString)
  .map { elem =>
    new ProducerRecord[Array[Byte], String]("topic1", elem)
  }
  .runWith(Producer.plainSink(producerSettings))

还是我需要在这里做类似的例子 https://github.com/hseeberger/accessus 在我的Server.scala中的bindAndHandle()方法中?

Or do I need to do something like the example here https://github.com/hseeberger/accessus in the bindAndHandle() method in my Server.scala?

WebServer.scala

WebServer.scala

object System {

  implicit val system = ActorSystem()
  implicit val dispatcher = system.dispatcher
  implicit val actorMaterializer = ActorMaterializer()

}

object WebServer extends App {

  import System._

  val config = new ApplicationConfig() with ConfigLoader
  ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig))
  val injector = Guice.createInjector(new MyAppModule(config))
  val routes = injector.getInstance(classOf[Router]).routes
  Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port)

}

Router.scala

Router.scala

def routes(): Route = {
    post {
      entity(as[User]) { user =>
        val createUser = userService.create(user)
        onSuccess(createUser) {
          case Invalid(y: NonEmptyList[Err]) =>  {
            throw new ValidationException(y)
          }
          case Valid(u: User) => {
              complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    } ~
    // More routes here, left out for example  
}

Service.scala

Service.scala

def create(user: User): Future[MaybeValid[User]] = {
    for {
      validating <- userValidation.validateCreate(user)
      result <- validating match {
        case Valid(x: User) =>
          userRepo.create(x)
            .map(dbUser => Valid(UserConverters.fromUserRow(x)))
        case y: DefInvalid =>
          Future{y}
      }
    } yield result
  }

Repo.scala

Repo.scala

def create(user: User): Future[User] = {
    mutateDbProvider.db.run(
      userTable returning userTable.map(_.userId)
        into ((user, id) => user.copy(userId = id)) +=
        user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now())))
    )
  }


推荐答案

由于您已经编写了 Route 来解组仅1个 User 实体中的c $ c>我认为您不需要 Producer.plainSink 。相反,我认为 Producer.send 也可以正常工作。另外,作为一个附带说明,抛出异常不是惯用的 scala。因此,我更改了无效用户的逻辑:

Since you have written your Route to unmarshall just 1 User from the Entity I don't think you need Producer.plainSink. Rather, I think Producer.send will work just as well. Also, as a side note, throwing exceptions is not "idiomatic" scala. So I changed the logic for invalid user:

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1",s"""{"userId" : ${u.userId}, "entity" : "User"}""")

          onComplete(producer send producerRecord) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

这篇关于如何将Akka Streams Kafka(reactive-kafka)集成到Akka HTTP应用程序中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆