Apache Flink:无法从ObjectNode :: get中提取密钥 [英] Apache Flink: Could not extract key from ObjectNode::get
问题描述
我正在使用Flink处理来自某些数据源(例如Kafka,Pravega等)的数据.
I'm using Flink to process the data coming from some data source (such as Kafka, Pravega etc).
就我而言,数据源是Pravega,它为我提供了flink连接器.
In my case, the data source is Pravega, which provided me a flink connector.
我的数据源向我发送了一些JSON数据,如下所示:
My data source is sending me some JSON data as below:
{"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
这是我的代码:
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.keyBy(new KeySelector<ObjectNode, String>() {
@Override
public String getKey(ObjectNode node) throws Exception {
return node.get("id").asText();
}
}).print();
env.execute("StreamingJob");
如您所见,我使用FlinkPravegaReader
和适当的解串器来获取来自Pravega的JSON流.
As you see, I used the FlinkPravegaReader
and a proper deserializer to get the JSON stream coming from Pravega.
然后我尝试使用自定义KeySelector KeyBy
并打印它.
Then I try to KeyBy
it with a custom KeySelector and print it.
但是,我得到一个错误:
However, I get an error:
原因:java.lang.RuntimeException:无法从中提取密钥 {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType:" Int64," name:" int," device:" rand-numeric," value:" 0}]}
Caused by: java.lang.RuntimeException: Could not extract key from {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
似乎node.get("id").asText();
抛出了此异常.
我不知道为什么.如我们所见,JSON数据中确实存在一个名为id
的密钥.为什么无法提取?我是否错误地或其他一些原因使用了ObjectNode
类?
I don't know why. As we see there does exist a key named id
in the JSON data. Why can't it be extracted? Have I used the class ObjectNode
wrongly or some other reason?
堆栈跟踪:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa9846e6834ae1391acbf51d5ad35aac)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa9846e6834ae1391acbf51d5ad35aac)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at myflink.StreamingJob.main(StreamingJob.java:137)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa9846e6834ae1391acbf51d5ad35aac)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not extract key from {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at io.pravega.connectors.flink.FlinkPravegaReader.run(FlinkPravegaReader.java:307)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)
Caused by: java.lang.RuntimeException: Could not extract key from {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:56)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 9 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at myflink.StreamingJob$1.getKey(StreamingJob.java:125)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:54)
... 12 more
推荐答案
您可以在此处查看POJO类型的规则.
You can check the rules for POJO types here.
通过使用POJO类型,Flink可以推断出许多有关在分布式计算过程中交换和存储的数据类型的信息.
By using POJO types, Flink can infer a lot of information about the data types that are exchanged and stored during the distributed computation.
以下代码为您定义了POJO.
The following codes define POJOs for you input.
public class FlinkPOJO {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> source =
env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true) {
sourceContext.collect("{\"device\":\"rand-numeric\",\"id\":\"b4728895-741f-466a-b87b-79c7590893b4\",\"origin\":\"1591095418904441036\",\"readings\":[{\"origin\":\"1591095418904328442\",\"valueType\":\"Int64\",\"name\":\"int\",\"device\":\"rand-numeric\",\"value\":\"0\"}]}");
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
DataStream<Info> parsedSource =
source.map(new MapFunction<String, Info>() {
@Override
public Info map(String s) throws Exception {
Gson gson = new Gson();
return gson.fromJson(s, Info.class);
}
});
DataStream<String> output = parsedSource.keyBy(Info::getId).timeWindow(Time.seconds(1))
.process(new ProcessWindowFunction<Info, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Info> iterable, Collector<String> collector) throws Exception {
int count = 0;
Iterator<Info> iterator = iterable.iterator();
while (iterator.hasNext()) {
count++;
iterator.next();
}
collector.collect(String.format("key : %s, size : %s", s, count));
}
});
output.print();
env.execute();
}
public class Info {
public String getDevice() {
return device;
}
public void setDevice(String device) {
this.device = device;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getOrigin() {
return origin;
}
public void setOrigin(String origin) {
this.origin = origin;
}
public Reading[] getReadings() {
return readings;
}
public void setReadings(Reading[] readings) {
this.readings = readings;
}
public String device;
public String id;
public String origin;
public Reading[] readings;
public Info() {
}
}
public class Reading {
public String origin;
public String valueType;
public String name;
public String device;
public String value;
public Reading() {
}
}
}
实际上,您可以定义一个简短的POJO,其中仅包含您需要的字段.
Actually, you can define a brief POJO which only contains the fields you need.
这篇关于Apache Flink:无法从ObjectNode :: get中提取密钥的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!