无法删除 IDE 中 Kafka Stream 应用程序的状态目录 [英] Failed to delete the state directory in IDE for Kafka Stream Application

查看:17
本文介绍了无法删除 IDE 中 Kafka Stream 应用程序的状态目录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个简单的 Kafka Stream 应用程序,它从一个主题中提取消息并在转换后将其放入另一个主题.我正在使用 Intelij 进行开发.

I am developing a simple Kafka Stream application which extracting messages from a topic and put it into another topic after transformation. I am using Intelij for my development.

当我调试/运行这个应用程序时,如果我的 IDE 和 Kafka 服务器坐在相同的机器

When I debug/run this application, it works perfect if my IDE and the Kafka Server sitting in the SAME machine

(即使用 BOOTSTRAP_SERVERS_CONFIG = localhost:9092 和SCHEMA_REGISTRY_URL_CONFIG = 本地主机:8081)

(i.e. with the BOOTSTRAP_SERVERS_CONFIG = localhost:9092 and SCHEMA_REGISTRY_URL_CONFIG = localhost:8081)

但是,当我尝试使用另一台机器进行开发时

However, when I try to use another machine to do the development

(即使用 BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092 和SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081 其中 XXX.XXX.XXX 是我的Kafka的IP地址),

(i.e. with the BOOTSTRAP_SERVERS_CONFIG = XXX.XXX.XXX:9092 and SCHEMA_REGISTRY_URL_CONFIG = XXX.XXX.XXX:8081 where XXX.XXX.XXX is the ip address of my Kafka),

调试过程运行第一次没有问题.但是,当我在重置偏移量后第二次运行时,收到以下错误:

the debug process run without problem at the 1st time. However, when I run 2nd time after resetting the offset, I received the following error:

ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) 
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\my_application_id\0_0
Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException:

如果我将 my_application_id 更改为 my_application_id2 并运行它,它会在第一次再次运行,但如果我再次运行它,则会再次收到错误.

If I changed my_application_id as my_application_id2, and run it, it works again at the 1st time but receiving error again if I run it again.

我的应用程序的最后一句中有以下代码:

I have the following code in my last sentence in my application:

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

有什么建议可以解决这个问题吗?

Any advice how to solve this problem?

更新:

我已经查看了在我的开发机器(Windows 平台)中创建的状态目录,如果我在第二次运行之前手动删除这些目录,没有发现错误.我试图以管理员身份运行我的 IDE,因为我认为这可能与文件夹的权限有关.但是,这无济于事.

I have reviewed the state directory created in my development machine (Windows Platform) and if I delete these directory manually before running 2nd time, no error found. I have tried to run my IDE as Administrator because I think this could be something about the permission on the folder. However, this doesn't help.

完整堆栈供参考:

信息卡夫卡版本:1.1.0 (org.apache.kafka.common.utils.AppInfoParser:109)信息 Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser:110)INFO 流线程 [main] 作为用户调用清理删除任务 0_0 的状态目录 0_0.(org.apache.kafka.streams.processor.internals.StateDirectory:281)与目标虚拟机断开连接,地址:'127.0.0.1:16552',传输:'socket'线程主"org.apache.kafka.streams.errors.StreamsException 中的异常:java.nio.file.DirectoryNotEmptyException:C:\workspace\bennychan\kafka-streams\my_application_001\0_0在 org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231)在 org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)在 com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60)在 com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45)引起:java.nio.file.DirectoryNotEmptyException:C:\workspace\bennychan\kafka-streams\my_application_001\0_0在 sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)在 sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)在 java.nio.file.Files.delete(Files.java:1126)在 org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651)在 org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634)在 java.nio.file.Files.walkFileTree(Files.java:2688)在 java.nio.file.Files.walkFileTree(Files.java:2742)在 org.apache.kafka.common.utils.Utils.delete(Utils.java:634)ERROR stream-thread [main] 无法删除状态目录.(org.apache.kafka.streams.processor.internals.StateDirectory:297)在 org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0在 org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228)在 sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)... 3个在 sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)在 java.nio.file.Files.delete(Files.java:1126)在 org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651)在 org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634)在 java.nio.file.Files.walkFileTree(Files.java:2688)在 java.nio.file.Files.walkFileTree(Files.java:2742)在 org.apache.kafka.common.utils.Utils.delete(Utils.java:634)在 org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)在 org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228)在 org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)在 com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60)在 com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45)

INFO Kafka version : 1.1.0 (org.apache.kafka.common.utils.AppInfoParser:109) INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser:110) INFO stream-thread [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup. (org.apache.kafka.streams.processor.internals.StateDirectory:281) Disconnected from the target VM, address: '127.0.0.1:16552', transport: 'socket' Exception in thread "main" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:231) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45) Caused by: java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory:297) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) java.nio.file.DirectoryNotEmptyException: C:\workspace\bennychan\kafka-streams\my_application_001\0_0 at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266) ... 3 more at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) at java.nio.file.Files.delete(Files.java:1126) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651) at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634) at java.nio.file.Files.walkFileTree(Files.java:2688) at java.nio.file.Files.walkFileTree(Files.java:2742) at org.apache.kafka.common.utils.Utils.delete(Utils.java:634) at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287) at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228) at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931) at com.macroviewhk.financialreport.simpleStream.start(simpleStream.java:60) at com.macroviewhk.financialreport.simpleStream.main(simpleStream.java:45)

更新2:再次详细检查后,下面这行抛出 IOException

UPDATE 2 : After another detailed check, the line below throwing IOException

Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() {

此行位于 kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class

This line is located at kafka-clients-1.1.0.jar org.apache.kafka.common.utilsUtils.class

可能是windows系统的问题(抱歉我不是有经验的JAVA程序员).

May be this is the problem with Windows system (sorry that I am not an experienced JAVA programmer).

推荐答案

对于 googlers..

For googlers..

我目前正在使用这个 Scala 代码来帮助 Windows 人员处理状态存储的删除.

I'm currently using this Scala code for helping windows guys to handle deletion of state store.

if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  logger.info("WINDOWS OS MODE - Cleanup state store.")
  try {
    FileUtils.deleteDirectory(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
    FileUtils.forceMkdir(new File("/tmp/kafka-streams/" + config.getProperty("application.id")))
  } catch {
    case e: Exception => logger.error(e.toString)
  }
}
else {
  streams.cleanUp()
}

这篇关于无法删除 IDE 中 Kafka Stream 应用程序的状态目录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆