集成 - Apache Flink + Spring Boot [英] Integration - Apache Flink + Spring Boot
问题描述
我正在测试 Apache Flink 和 Spring Boot 之间的集成,在 IDE 上运行它们很好,但是当我尝试在 Apache Flink 集群上运行时,我遇到了一个与 ClassLoader 相关的异常.
I'm testing the integration between Apache Flink and Spring Boot, to run them on IDE is fine, but when I tried to run on Apache Flink Cluster I had one Exception related to ClassLoader.
课程非常简单:
BootFlinkApplication
@SpringBootApplication
@ComponentScan("com.example.demo")
public class BootFlinkApplication {
public static void main(String[] args) {
System.out.println("some test");
SpringApplication.run(BootFlinkApplication.class, args);
}
}
FlinkTest
@Service
public class FlinkTest {
@PostConstruct
public void init() {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.fromElements(1, 2, 3, 4)
.filter(new RemoveNumber3Filter()).print();
try {
see.execute();
} catch (Exception e) {
System.out.println("Error executing flink job: " + e.getMessage());
}
}
}
RemoveNumber3Filter
public class RemoveNumber3Filter implements FilterFunction<Integer> {
@Override
public boolean filter(Integer i) throws Exception {
return i != 3;
}
}
例外:
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.demo.RemoveNumber3Filter
ClassLoader info: URL ClassLoader:
file: '/tmp/blobStore-850f3189-807e-4f8d-a8a6-3bd3c1bd76b4/job_eb93b239080b4d4e09f10f1e3605744d/blob_p-5fd56f3348976c0d333d680fde4a79573c21cd40-48ac0995eee11f38ce3ff4f890102af8' (valid JAR)
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
推荐答案
您可能使用 Spring Boot Maven 插件 (https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) 重新打包您的 Jar 以生成可执行 jar,但是,它使用了 Apache Flink 的内部类加载器不支持的自定义引导布局.在您尝试部署的文件 (.jar.original) 旁边应该有一个原始 jar 文件,您可以将其用于在 Flink 集群上进行部署.
You probably use the Spring Boot Maven plugin (https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) to repackage your Jar to generate an executable jar, however, it uses a custom boot layout which is not supported by Apache Flink's internal class loader. There should be an original jar file next to the one you try to deploy (.jar.original) which you could use for deployment on the Flink cluster.
作为替代方案,您可以使用不同的方式生成包含所有依赖项的 Jar,例如 maven-shade (https://maven.apache.org/plugins/maven-shade-plugin/)
As an alternative, you can use a different way of generating a Jar with all your dependencies, such as maven-shade (https://maven.apache.org/plugins/maven-shade-plugin/)
这篇关于集成 - Apache Flink + Spring Boot的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!