未能刷新状态存储 [英] Failed to flush state store

查看:27
本文介绍了未能刷新状态存储的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在 Kafka Streams 中创建一个 leftJoin,它可以正常工作大约 10 条记录,然后它崩溃了,由 NullPointerException 引起的异常使用这样的代码:

I'm trying to create a leftJoin in Kafka Streams which works fine for about 10 Records and then it crashes with an exception caused by a NullPointerException with such code:

private static KafkaStreams getKafkaStreams() {
    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    KStreamBuilder builder = new KStreamBuilder();

    KTable<String, Verkaeufer> umsatzTable = builder.table(Serdes.String(), EventstreamSerde.Verkaeufer(), CommonUtilsConstants.TOPIC_VERKAEUFER_STAMMDATEN);
    KStream<String, String> verkaeuferStream = builder.stream(CommonUtilsConstants.TOPIC_ANZAHL_UMSATZ_PER_VERKAEUFER);

    KStream<String, String> tuttiStream = verkaeuferStream.leftJoin(umsatzTable,
            (tutti, verkaeufer) -> ("Vorname=" + verkaeufer.getVorname().toString() +",Nachname=" +verkaeufer.getNachname().toString() +"," +tutti.toString()), Serdes.String(), Serdes.String());

    tuttiStream.to(Serdes.String(), Serdes.String(), CommonUtilsConstants.TOPIC_TUTTI);

    return new KafkaStreams(builder, streamsConfig);
}

StreamsConfig 看起来像这样:

private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CommonUtilsConstants.BOOTSTRAP_SERVER_CONFIGURATION);
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, CommonUtilsConstants.GID_TUTTI);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "1000");

    return props;
}

完整堆栈跟踪:

22:19:36.550 [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] Failed to commit StreamTask 0_0 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KTABLE-SOURCE-STATE-STORE-0000000000
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:262)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:190)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:282)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815)
at org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:797)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:778)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: java.lang.NullPointerException: null
at java.lang.String.<init>(String.java:143)
at ch.wesr.eventstream.commonutils.serde.GsonDeserializer.deserialize(GsonDeserializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:90)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:145)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:103)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:97)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:260)
... 14 common frames omitted

更新:

这就是 GsonDeserialize 的样子

public class GsonDeserializer<T> implements Deserializer<T>{

    public static final String CONFIG_VALUE_CLASS = "default.value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "default.key.deserializer.class";
    private Class<T> deserializedClass;
    private Gson gson = new GsonBuilder().create();

    public GsonDeserializer() {}

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));
        try {
            if (deserializedClass == null) {
                deserializedClass = (Class<T>) Class.forName(clsName);
            }
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                            "Did you forget to specify the '%s' property ?%n",
                    configKey);
            System.out.println(e.getMessage());
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        return gson.fromJson(new String(bytes), deserializedClass);
    }

    @Override
    public void close() {}
}

推荐答案

只要不刷新缓存,就永远不会调用您的解串器.这就是为什么它在开始时不会失败,您可以通过缓存大小参数和提交间隔(我们在提交时刷新)来增加失败的时间.

As long as the cache is not flushed, your deserializer is never called. That's why it doesn't fail in the beginning and you can increase the time until it fails via cache size parameter and commit interval (we flush on commit).

查看您的 GsonDeserializer 代码,似乎 new String(bytes) 因 NPE 而失败 -- String 构造函数不能采用 null 作为参数——你的解串器代码必须防止 bytes==null 并且应该直接返回 null 用于这种情况.

Looking at your code for GsonDeserializer, it seems that new String(bytes) fails with NPE -- String constructor cannot take null as parameter -- your deserializer code must guard against bytes==null and should return null for this case directly.

这篇关于未能刷新状态存储的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆