Apache Flink:无法从 ObjectNode::get 中提取密钥 [英] Apache Flink: Could not extract key from ObjectNode::get
问题描述
我正在使用 Flink 处理来自某个数据源(例如 Kafka、Pravega 等)的数据.
I'm using Flink to process the data coming from some data source (such as Kafka, Pravega etc).
就我而言,数据源是 Pravega,它为我提供了一个 flink 连接器.
In my case, the data source is Pravega, which provided me a flink connector.
我的数据源正在向我发送一些 JSON 数据,如下所示:
My data source is sending me some JSON data as below:
{"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
这是我的一段代码:
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
PravegaDeserializationSchema<ObjectNode> adapter = new PravegaDeserializationSchema<>(ObjectNode.class, new JavaSerializer<>());
FlinkPravegaReader<ObjectNode> source = FlinkPravegaReader.<ObjectNode>builder()
.withPravegaConfig(pravegaConfig)
.forStream(stream)
.withDeserializationSchema(adapter)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<ObjectNode> dataStream = env.addSource(source).name("Pravega Stream");
dataStream.keyBy(new KeySelector<ObjectNode, String>() {
@Override
public String getKey(ObjectNode node) throws Exception {
return node.get("id").asText();
}
}).print();
env.execute("StreamingJob");
如您所见,我使用了 FlinkPravegaReader
和适当的解串器来获取来自 Pravega 的 JSON 流.
As you see, I used the FlinkPravegaReader
and a proper deserializer to get the JSON stream coming from Pravega.
然后我尝试使用自定义 KeySelector KeyBy
它并打印它.
Then I try to KeyBy
it with a custom KeySelector and print it.
但是,我收到一个错误:
However, I get an error:
引起:java.lang.RuntimeException:无法从中提取密钥{"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"142valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
Caused by: java.lang.RuntimeException: Could not extract key from {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
似乎 node.get("id").asText();
抛出了这个异常.
It seems that node.get("id").asText();
threw this exception.
我不知道为什么.正如我们所见,JSON 数据中确实存在一个名为 id
的键.为什么提取不出来?我是否错误地使用了 ObjectNode
类或其他原因?
I don't know why. As we see there does exist a key named id
in the JSON data. Why can't it be extracted? Have I used the class ObjectNode
wrongly or some other reason?
堆栈跟踪:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa9846e6834ae1391acbf51d5ad35aac)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa9846e6834ae1391acbf51d5ad35aac)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at myflink.StreamingJob.main(StreamingJob.java:137)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: fa9846e6834ae1391acbf51d5ad35aac)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:496)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.GeneratedMethodAccessor77.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not extract key from {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at io.pravega.connectors.flink.FlinkPravegaReader.run(FlinkPravegaReader.java:307)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)
Caused by: java.lang.RuntimeException: Could not extract key from {"device":"rand-numeric","id":"b4728895-741f-466a-b87b-79c7590893b4","origin":"1591095418904441036","readings":[{"origin":"1591095418904328442","valueType":"Int64","name":"int","device":"rand-numeric","value":"0"}]}
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:56)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:32)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 9 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
at myflink.StreamingJob$1.getKey(StreamingJob.java:125)
at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:54)
... 12 more
推荐答案
您可以在此处查看 POJO 类型的规则.
You can check the rules for POJO types here.
通过使用 POJO 类型,Flink 可以推断出大量关于分布式计算过程中交换和存储的数据类型的信息.
By using POJO types, Flink can infer a lot of information about the data types that are exchanged and stored during the distributed computation.
以下代码为您的输入定义了 POJO.
The following codes define POJOs for you input.
public class FlinkPOJO {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> source =
env.addSource(new SourceFunction<String>() {
@Override
public void run(SourceContext<String> sourceContext) throws Exception {
while (true) {
sourceContext.collect("{\"device\":\"rand-numeric\",\"id\":\"b4728895-741f-466a-b87b-79c7590893b4\",\"origin\":\"1591095418904441036\",\"readings\":[{\"origin\":\"1591095418904328442\",\"valueType\":\"Int64\",\"name\":\"int\",\"device\":\"rand-numeric\",\"value\":\"0\"}]}");
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
DataStream<Info> parsedSource =
source.map(new MapFunction<String, Info>() {
@Override
public Info map(String s) throws Exception {
Gson gson = new Gson();
return gson.fromJson(s, Info.class);
}
});
DataStream<String> output = parsedSource.keyBy(Info::getId).timeWindow(Time.seconds(1))
.process(new ProcessWindowFunction<Info, String, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<Info> iterable, Collector<String> collector) throws Exception {
int count = 0;
Iterator<Info> iterator = iterable.iterator();
while (iterator.hasNext()) {
count++;
iterator.next();
}
collector.collect(String.format("key : %s, size : %s", s, count));
}
});
output.print();
env.execute();
}
public class Info {
public String getDevice() {
return device;
}
public void setDevice(String device) {
this.device = device;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getOrigin() {
return origin;
}
public void setOrigin(String origin) {
this.origin = origin;
}
public Reading[] getReadings() {
return readings;
}
public void setReadings(Reading[] readings) {
this.readings = readings;
}
public String device;
public String id;
public String origin;
public Reading[] readings;
public Info() {
}
}
public class Reading {
public String origin;
public String valueType;
public String name;
public String device;
public String value;
public Reading() {
}
}
}
实际上,您可以定义一个简短的 POJO,其中只包含您需要的字段.
Actually, you can define a brief POJO which only contains the fields you need.
这篇关于Apache Flink:无法从 ObjectNode::get 中提取密钥的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!