集成 - Apache Flink + Spring Boot [英] Integration - Apache Flink + Spring Boot

查看:69
本文介绍了集成 - Apache Flink + Spring Boot的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在测试 Apache Flink 和 Spring Boot 之间的集成,在 IDE 上运行它们很好,但是当我尝试在 Apache Flink 集群上运行时,我遇到了一个与 ClassLoader 相关的异常.

I'm testing the integration between Apache Flink and Spring Boot, to run them on IDE is fine, but when I tried to run on Apache Flink Cluster I had one Exception related to ClassLoader.

课程非常简单:

BootFlinkApplication

@SpringBootApplication
@ComponentScan("com.example.demo")
public class BootFlinkApplication {

    public static void main(String[] args) {
        System.out.println("some test");
        SpringApplication.run(BootFlinkApplication.class, args);
    }
}

FlinkTest

@Service
public class FlinkTest {
    @PostConstruct
    public void init() {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

        see.fromElements(1, 2, 3, 4)
            .filter(new RemoveNumber3Filter()).print();

        try {
            see.execute();
        } catch (Exception e) {
            System.out.println("Error executing flink job: " + e.getMessage());
        }
    }
}

RemoveNumber3Filter

public class RemoveNumber3Filter implements FilterFunction<Integer> {

    @Override
    public boolean filter(Integer i) throws Exception {
        return i != 3;
    }

}

例外:

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.demo.RemoveNumber3Filter
    ClassLoader info: URL ClassLoader:
        file: '/tmp/blobStore-850f3189-807e-4f8d-a8a6-3bd3c1bd76b4/job_eb93b239080b4d4e09f10f1e3605744d/blob_p-5fd56f3348976c0d333d680fde4a79573c21cd40-48ac0995eee11f38ce3ff4f890102af8' (valid JAR)
    Class not resolvable through given classloader.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)

推荐答案

您可能使用 Spring Boot Maven 插件 (https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) 重新打包您的 Jar 以生成可执行 jar,但是,它使用了 Apache Flink 的内部类加载器不支持的自定义引导布局.在您尝试部署的文件 (.jar.original) 旁边应该有一个原始 jar 文件,您可以将其用于在 Flink 集群上进行部署.

You probably use the Spring Boot Maven plugin (https://docs.spring.io/spring-boot/docs/current/reference/html/build-tool-plugins-maven-plugin.html) to repackage your Jar to generate an executable jar, however, it uses a custom boot layout which is not supported by Apache Flink's internal class loader. There should be an original jar file next to the one you try to deploy (.jar.original) which you could use for deployment on the Flink cluster.

作为替代方案,您可以使用不同的方式生成包含所有依赖项的 Jar,例如 maven-shade (https://maven.apache.org/plugins/maven-shade-plugin/)

As an alternative, you can use a different way of generating a Jar with all your dependencies, such as maven-shade (https://maven.apache.org/plugins/maven-shade-plugin/)

这篇关于集成 - Apache Flink + Spring Boot的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆