如何将 akka 流 kafka(reactive-kafka)集成到 akka http 应用程序中? [英] How to integrate akka streams kafka (reactive-kafka) into akka http application?
问题描述
我有一个基本的 scala akka http CRUD 应用程序.相关课程见下文.
I have a basic scala akka http CRUD application. See below for the relevant classes.
我只想在例如创建/更新实体时将实体 ID 和一些数据(作为 json)写入 Kafka 主题.
I'd simply like to write an entity id and some data (as json) to a Kafka topic whenever, for example, an entity is created/updated.
我正在查看 http://doc.akka.io/docs/akka-stream-kafka/current/producer.html,但我是 scala 和 akka 的新手,不确定如何将它集成到我的应用程序中?
I'm looking at http://doc.akka.io/docs/akka-stream-kafka/current/producer.html, but am new to scala and akka, and unsure of how to integrate it into my application?
例如,从上面的文档来看,这是一个生产者写入 kafka 的例子,所以我想我需要类似的东西,但是我的应用程序应该在哪里?创建用户后,我可以在我的服务的 create 方法中添加另一个地图调用吗?
For example, from the docs above, this is the example of a producer writing to kafka, so I think I need to something similar, but whereabouts in my application should this go? Can I just add another map call in the create method in my service after I have created the user?
非常感谢!
val done = Source(1 to 100)
.map(_.toString)
.map { elem =>
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))
或者我需要做一些类似于这里的例子 https://github.com/hseeberger/accessus 在我的 Server.scala 的 bindAndHandle() 方法中?
Or do I need to do something like the example here https://github.com/hseeberger/accessus in the bindAndHandle() method in my Server.scala?
WebServer.scala
WebServer.scala
object System {
implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val actorMaterializer = ActorMaterializer()
}
object WebServer extends App {
import System._
val config = new ApplicationConfig() with ConfigLoader
ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig))
val injector = Guice.createInjector(new MyAppModule(config))
val routes = injector.getInstance(classOf[Router]).routes
Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port)
}
Router.scala
Router.scala
def routes(): Route = {
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) => {
throw new ValidationException(y)
}
case Valid(u: User) => {
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
} ~
// More routes here, left out for example
}
Service.scala
Service.scala
def create(user: User): Future[MaybeValid[User]] = {
for {
validating <- userValidation.validateCreate(user)
result <- validating match {
case Valid(x: User) =>
userRepo.create(x)
.map(dbUser => Valid(UserConverters.fromUserRow(x)))
case y: DefInvalid =>
Future{y}
}
} yield result
}
repo.scala
def create(user: User): Future[User] = {
mutateDbProvider.db.run(
userTable returning userTable.map(_.userId)
into ((user, id) => user.copy(userId = id)) +=
user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now())))
)
}
推荐答案
自从您编写了 Route
以从 Entity中解组 1 个
User
code> 我认为你不需要 Producer.plainSink
.相反,我认为 Producer.send
也能正常工作.另外,作为旁注,抛出异常不是惯用的"scala.所以我改变了无效用户的逻辑:
Since you have written your Route
to unmarshall just 1 User
from the Entity
I don't think you need Producer.plainSink
. Rather, I think Producer.send
will work just as well. Also, as a side note, throwing exceptions is not "idiomatic" scala. So I changed the logic for invalid user:
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1",s"""{"userId" : ${u.userId}, "entity" : "User"}""")
onComplete(producer send producerRecord) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
}
}
这篇关于如何将 akka 流 kafka(reactive-kafka)集成到 akka http 应用程序中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!