从流作业启动批处理 [英] Starting Batch process from a stream job

查看:147
本文介绍了从流作业启动批处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用于Flink流处理的Maven项目.根据我从流中收到的消息,我开始了批处理过程,但是当前我遇到了错误.

Hi I have a maven project for Flink stream processing. Based the message I get from the stream I start a batch process but currently I am getting an error.

对于这个flink世界,我还很陌生,如果您有任何想法,请告诉我.这是我用来启动独立集群的代码.

I am pretty new to this flink world and please let me know if you have any idea. Here is the code I am using to start a standalone cluster.

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment ( );

    KafkaConsumerService kafkaConsumerService= new KafkaConsumerService();
    FlinkKafkaConsumer010<String> kafkaConsumer = kafkaConsumerService.getKafkaConsumer(settings );
    DataStream<String> messageStream = env.addSource (kafkaConsumer).setParallelism (3);

    messageStream
            .filter(new MyFilter()).setParallelism(3).name("Filter")
            .map(new ProcessFile(arg)).setParallelism(3).name("start batch")
            .addSink(new DiscardingSink()).setParallelism(3).name("DiscardData");

    env.execute("Stream processor");

//ProcessFile映射类

//ProcessFile map class

    public ProcessFile(String arg) { }

@Override
public String map(String message) throws Exception {
    MessageType typedmessage = ParseMessage(message);
    if (isWhatIwant()) {
        String[] batchArgs = createBatchArgs();
                    Configuration config = new Configuration();
        config.setString(JobManagerOptions.ADDRESS, jobMasterHost);
        config.setInteger(JobManagerOptions.PORT, jobMasterPort);

        StandaloneClusterClient client = new StandaloneClusterClient(config);
        client.setDetached(true);
        PackagedProgram program = new PackagedProgram(new File(jarLocation), SupplyBatchJob.class.getName(), batchArgs);
        client.run(program, 7);
    }

    return typedmessage;
}

该错误是从作业管理器Web门户复制的.我得到的错误:org.apache.flink.client.program.ProgramInvocationException:无法检索JobManager网关. 在org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:497) 在org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103) 在org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) 在org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76) 在org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) 在cw.supply.data.parser.maps.ProcessFileMessage.map(ProcessFileMessage.java:47) 在cw.supply.data.parser.maps.ProcessFileMessage.map(ProcessFileMessage.java:25) 在org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) 在org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator(OperatorChain.java:528)处 在org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect(OperatorChain.java:503) 在org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect(OperatorChain.java:483) 在org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect(AbstractStreamOperator.java:891) 在org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect(AbstractStreamOperator.java:869) 在org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) 在org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator(OperatorChain.java:528)处 在org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect(OperatorChain.java:503) 在org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect(OperatorChain.java:483) 在org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect(AbstractStreamOperator.java:891) 在org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect(AbstractStreamOperator.java:869) 在org.apache.flink.streaming.api.operators.StreamSourceContexts $ NonTimestampContext.collect(StreamSourceContexts.java:103)处 在org.apache.flink.streaming.api.operators.StreamSourceContexts $ NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110)处 在org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:269) 在org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) 在org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152) 在org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:483) 在org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) 在org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) 在org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) 在org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) 在org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 在java.lang.Thread.run(Thread.java:748) 原因:org.apache.flink.util.FlinkException:无法连接到领先的JobManager.请检查JobManager是否正在运行. 在org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:789) 在org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:495) ...另外30个 引起原因:org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException:无法检索领导者网关. 在org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79) 在org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:784) ...另外31个 原因:java.util.concurrent.TimeoutException:期货在[10000毫秒]后超时 在scala.concurrent.impl.Promise $ DefaultPromise.ready(Promise.scala:219) 在scala.concurrent.impl.Promise $ DefaultPromise.result(Promise.scala:223) 在scala.concurrent.Await $$ anonfun $ result $ 1.apply(package.scala:190) 在scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn(BlockContext.scala:53) 在scala.concurrent.Await $ .result(package.scala:190) 在scala.concurrent.Await.result(package.scala) 在org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77) ...另外32个

The error is copied from the Job manager web portal. Error I am getting: org.apache.flink.client.program.ProgramInvocationException: Failed to retrieve the JobManager gateway. at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:497) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) at cw.supply.data.parser.maps.ProcessFileMessage.map(ProcessFileMessage.java:47) at cw.supply.data.parser.maps.ProcessFileMessage.map(ProcessFileMessage.java:25) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:103) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:110) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:269) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:86) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:483) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running. at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:789) at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:495) ... 30 more Caused by: org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not retrieve the leader gateway. at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79) at org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:784) ... 31 more Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at scala.concurrent.Await.result(package.scala) at org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77) ... 32 more

推荐答案

在我访问了经过验证的环境后,我发现了问题所在.我正在使用未打开端口的JobManager的公共地址.相反,我开始使用专用IP,因为所有节点都在同一子网中,无需打开端口即可.希望这也会对其他人有所帮助.

I figured what the issue is after getting access to the environment I verified. I was using the public address of the JobManager where the port is not open. Instead I started using the private IP since all nodes are in the same subnet and no need of opening the port to the world. Hope this helps someone else too.

这篇关于从流作业启动批处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆