MongoDB反应式模板交易 [英] MongoDB reactive template transactions

查看:249
本文介绍了MongoDB反应式模板交易的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在开源项目中使用mongodb已有一年多了,最近我决定尝试进行交易.在为使用事务的方法编写了一些测试之后,我发现它们抛出了一些奇怪的异常,而我无法弄清问题出在哪里.所以我有一个方法 delete,该方法使用自定义coroutine contextmutex:

I've been using mongodb for my open source project for more than a year now and recently I decided to try out the transactions. After writing some tests for methods that use transactions I figured out that they throw some strange exceptions and I can't figure out what is the problem. So I have a method delete that uses custom coroutine context and a mutex:

  open suspend fun delete(photoInfo: PhotoInfo): Boolean {
    return withContext(coroutineContext) {
      return@withContext mutex.withLock {
        return@withLock deletePhotoInternalInTransaction(photoInfo)
      }
    }
  }

然后它调用

It then calls a method that executes some deletion:

  //FIXME: doesn't work in tests
  //should be called from within locked mutex
  private suspend fun deletePhotoInternalInTransaction(photoInfo: PhotoInfo): Boolean {
    check(!photoInfo.isEmpty())

    val transactionMono = template.inTransaction().execute { txTemplate ->
      return@execute photoInfoDao.deleteById(photoInfo.photoId, txTemplate)
        .flatMap { favouritedPhotoDao.deleteFavouriteByPhotoName(photoInfo.photoName, txTemplate) }
        .flatMap { reportedPhotoDao.deleteReportByPhotoName(photoInfo.photoName, txTemplate) }
        .flatMap { locationMapDao.deleteById(photoInfo.photoId, txTemplate) }
        .flatMap { galleryPhotoDao.deleteByPhotoName(photoInfo.photoName, txTemplate) }
    }.next()

    return try {
      transactionMono.awaitFirst()
      true
    } catch (error: Throwable) {
      logger.error("Could not delete photo", error)
      false
    }
  }

在这里,我有五个操作可以从五个不同的文档中删除数据.这是其中一种操作的示例:

Here I have five operations that delete data from five different documents. Here is an example of one of the operations:

open fun deleteById(photoId: Long, template: ReactiveMongoOperations = reactiveTemplate): Mono<Boolean> {
    val query = Query()
      .addCriteria(Criteria.where(PhotoInfo.Mongo.Field.PHOTO_ID).`is`(photoId))

    return template.remove(query, PhotoInfo::class.java)
      .map { deletionResult -> deletionResult.wasAcknowledged() }
      .doOnError { error -> logger.error("DB error", error) }
      .onErrorReturn(false)
  }

如果任何一个删除失败,我希望此操作失败,所以我使用事务.

I want this operation to fail if either of deletions fails so I use a transaction.

然后我有一些对使用此delete方法的处理程序进行测试:

Then I have some tests for a handler that uses this delete method:

  @Test
  fun `photo should not be uploaded if could not enqueue static map downloading request`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(false)
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.DatabaseError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

  @Test
  fun `photo should not be uploaded when resizeAndSavePhotos throws an exception`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)

      Mockito.doThrow(IOException("BAM"))
        .`when`(diskManipulationService).resizeAndSavePhotos(any(), any())
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.ServerResizeError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

  @Test
  fun `photo should not be uploaded when copyDataBuffersToFile throws an exception`() {
    val webClient = getWebTestClient()
    val userId = "1234235236"
    val token = "fwerwe"

    runBlocking {
      Mockito.`when`(remoteAddressExtractorService.extractRemoteAddress(any())).thenReturn(ipAddress)
      Mockito.`when`(banListRepository.isBanned(Mockito.anyString())).thenReturn(false)
      Mockito.`when`(userInfoRepository.accountExists(userId)).thenReturn(true)
      Mockito.`when`(userInfoRepository.getFirebaseToken(Mockito.anyString())).thenReturn(token)
      Mockito.`when`(staticMapDownloaderService.enqueue(Mockito.anyLong())).thenReturn(true)

      Mockito.doThrow(IOException("BAM"))
        .`when`(diskManipulationService).copyDataBuffersToFile(Mockito.anyList(), any())
    }

    kotlin.run {
      val packet = UploadPhotoPacket(33.4, 55.2, userId, true)
      val multipartData = createTestMultipartFile(PHOTO1, packet)

      val content = webClient
        .post()
        .uri("/v1/api/upload")
        .contentType(MediaType.MULTIPART_FORM_DATA)
        .body(BodyInserters.fromMultipartData(multipartData))
        .exchange()
        .expectStatus().is5xxServerError
        .expectBody()

      val response = fromBodyContent<UploadPhotoResponse>(content)
      assertEquals(ErrorCode.ServerDiskError.value, response.errorCode)

      assertEquals(0, findAllFiles().size)

      runBlocking {
        assertEquals(0, galleryPhotoDao.testFindAll().awaitFirst().size)
        assertEquals(0, photoInfoDao.testFindAll().awaitFirst().size)
      }
    }
  }

通常,第一个测试通过:

Usually the first test passes:

,以下两个失败,但有以下异常:

and the following two fail with the following exception:

17:09:01.228 [Thread-17] ERROR com.kirakishou.photoexchange.database.dao.PhotoInfoDao - DB error
org.springframework.data.mongodb.UncategorizedMongoDbException: Command failed with error 24 (LockTimeout): 'Unable to acquire lock '{8368122972467948263: Database, 1450593944826866407}' within a max lock request timeout of '5ms' milliseconds.' on server 192.168.99.100:27017. 

然后:

Caused by: com.mongodb.MongoCommandException: Command failed with error 246 (SnapshotUnavailable): 'Unable to read from a snapshot due to pending collection catalog changes; please retry the operation. Snapshot timestamp is Timestamp(1545661357, 23). Collection minimum is Timestamp(1545661357, 24)' on server 192.168.99.100:27017.

并且:

17:22:36.951 [Thread-16] WARN  reactor.core.publisher.FluxUsingWhen - Async resource cleanup failed after cancel
com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Transaction 1 has been aborted.' on server 192.168.99.100:27017. 

有时其中两个通过,而最后一个失败.

Sometimes two of them pass and the last one fails.

似乎只有第一个事务成功,而随后的任何事务都将失败,我想原因是我必须手动关闭它(或ClientSession).但是我找不到有关如何关闭事务/会话的任何信息.

It looks like only the first transaction succeeds and any following will fail and I guess the reason is that I have to manually close it (or the ClientSession). But I can't find any info on how to close transactions/sessions. Here is one of the few examples I could find where they use transactions with reactive template and I don't see them doing anything additional to close transaction/session.

或者也许是因为我正在模拟一种在事务内引发异常的方法?也许在这种情况下还没有关闭?

Or maybe it's because I'm mocking a method to throw an exception inside the transaction? Maybe it's not being closed in this case?

推荐答案

客户端会话/事务已正确关闭,但是测试中创建的索引似乎正在获取全局锁,导致下一个事务锁落后并等待超时在锁定请求上.

The client sessions/tranactions are closed properly however it appears the indexes creation in tests are acquiring global lock causes the next transaction lock to fall behind and wait before timing out on the lock request.

基本上,您必须管理索引的创建,这样它们才不会干扰来自客户​​的交易.

Basically you have to manage your index creation so they don’t interfere with transaction from client.

一个快速的解决方法是通过在shell中运行以下命令来增加锁定超时.

One quick fix would be to increase the lock timeout by running below command in shell.

db.adminCommand({setParameter:1,maxTransactionLockRequestTimeoutMillis:50})

db.adminCommand( { setParameter: 1, maxTransactionLockRequestTimeoutMillis: 50 } )

在生产中,您可以查看交易错误标签 然后重试该操作.

In production you can look at the transaction error label and retry the operation.

更多此处 https: //docs.mongodb.com/manual/core/transactions-production-consideration/#pending-ddl-operations-and-transactions

这篇关于MongoDB反应式模板交易的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆