Apache Flink:无法从 ObjectNode::get 中提取密钥 [英] Apache Flink: Could not extract key from ObjectNode::get

查看:46
本文介绍了Apache Flink:无法从 ObjectNode::get 中提取密钥的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Flink 处理来自某个数据源(例如 Kafka、Pravega 等)的数据.

I'm using Flink to process the data coming from some data source (such as Kafka, Pravega etc).

就我而言,数据源是 Pravega,它为我提供了一个 flink 连接器.

In my case, the data source is Pravega, which provided me a flink connector.

我的数据源正在向我发送一些 JSON 数据,如下所示:

My data source is sending me some JSON data as below:

{"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}

这是我的一段代码:

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(adapter)
    .build();

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.keyBy(new KeySelector<ObjectNode, String>() {
        @Override
        public String getKey(ObjectNode node) throws Exception {
            return node.get("id").asText();
        }
    }).print();
env.execute("StreamingJob");

如您所见,我使用了 FlinkPravegaReader 和适当的解串器来获取来自 Pravega 的 JSON 流.

As you see, I used the FlinkPravegaReader and a proper deserializer to get the JSON stream coming from Pravega.

然后我尝试使用自定义 KeySelector KeyBy 它并打印它.

Then I try to KeyBy it with a custom KeySelector and print it.

但是,我收到一个错误:

However, I get an error:

引起:java.lang.RuntimeException:无法从中提取密钥{"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","re​​adings":[{"origin":"142valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}

Caused by: java.lang.RuntimeException: Could not extract key from {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}

似乎 node.get("id").asText(); 抛出了这个异常.

It seems that node.get("id").asText(); threw this exception.

我不知道为什么.正如我们所见,JSON 数据中确实存在一个名为 id 的键.为什么提取不出来?我是否错误地使用了 ObjectNode 类或其他原因?

I don't know why. As we see there does exist a key named id in the JSON data. Why can't it be extracted? Have I used the class ObjectNode wrongly or some other reason?

堆栈跟踪:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa9846e6834ae1391acbf51d5ad35aac)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa9846e6834ae1391acbf51d5ad35aac)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
        at myflink.StreamingJob.main(StreamingJob.java:137)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
        ... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa9846e6834ae1391acbf51d5ad35aac)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
        ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
        at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not extract key from {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
        at io.pravega.connectors.flink.FlinkPravegaReader.run(FlinkPravegaReader.java:307)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)
Caused by: java.lang.RuntimeException: Could not extract key from {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
        at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:56)
        at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32)
        at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
        at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
        ... 9 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
        at myflink.StreamingJob$1.getKey(StreamingJob.java:125)
        at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:54)
        ... 12 more

推荐答案

您可以在此处查看 POJO 类型的规则.

You can check the rules for POJO types here.

规则对于 POJO 类型

通过使用 POJO 类型,Flink 可以推断出大量关于分布式计算过程中交换和存储的数据类型的信息.

By using POJO types, Flink can infer a lot of information about the data types that are exchanged and stored during the distributed computation.

以下代码为您的输入定义了 POJO.

The following codes define POJOs for you input.

public class FlinkPOJO {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataStream<String> source =
                env.addSource(new SourceFunction<String>() {
                    @Override
                    public void run(SourceContext<String> sourceContext) throws Exception {
                        while (true) {
                            sourceContext.collect("{\"device\":\"rand-numeric\",\"id\":\"b4728895-741f-466a-b87b-79c7590893b4\",\"origin\":\"1591095418904441036\",\"readings\":[{\"origin\":\"1591095418904328442\",\"valueType\":\"Int64\",\"name\":\"int\",\"device\":\"rand-numeric\",\"value\":\"0\"}]}");
                            Thread.sleep(1000);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });
        DataStream<Info> parsedSource =
                source.map(new MapFunction<String, Info>() {
                    @Override
                    public Info map(String s) throws Exception {
                        Gson gson = new Gson();
                        return gson.fromJson(s, Info.class);
                    }
                });

        DataStream<String> output = parsedSource.keyBy(Info::getId).timeWindow(Time.seconds(1))
                .process(new ProcessWindowFunction<Info, String, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<Info> iterable, Collector<String> collector) throws Exception {
                        int count = 0;
                        Iterator<Info> iterator = iterable.iterator();
                        while (iterator.hasNext()) {
                            count++;
                            iterator.next();
                        }
                        collector.collect(String.format("key : %s, size : %s", s, count));
                    }
                });
        output.print();
        env.execute();
    }


    public class Info {
        public String getDevice() {
            return device;
        }

        public void setDevice(String device) {
            this.device = device;
        }

        public String getId() {
            return id;
        }

        public void setId(String id) {
            this.id = id;
        }

        public String getOrigin() {
            return origin;
        }

        public void setOrigin(String origin) {
            this.origin = origin;
        }

        public Reading[] getReadings() {
            return readings;
        }

        public void setReadings(Reading[] readings) {
            this.readings = readings;
        }

        public String device;
        public String id;
        public String origin;
        public Reading[] readings;

        public Info() {

        }


    }

    public class Reading {
        public String origin;
        public String valueType;
        public String name;
        public String device;
        public String value;

        public Reading() {

        }
    }
}

实际上,您可以定义一个简短的 POJO,其中只包含您需要的字段.

Actually, you can define a brief POJO which only contains the fields you need.

这篇关于Apache Flink:无法从 ObjectNode::get 中提取密钥的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆