无法刷新状态存储 [英] Failed to flush state store

查看:45
本文介绍了无法刷新状态存储的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在Kafka Streams中创建一个leftJoin,它对于大约10条记录工作正常,然后由于NullPointerException导致的异常而崩溃,并显示以下代码:

I'm trying to create a leftJoin in Kafka Streams which works fine for about 10 Records and then it crashes with an exception caused by a NullPointerException with such code:

private static KafkaStreams getKafkaStreams() {
    StreamsConfig streamsConfig = new StreamsConfig(getProperties());
    KStreamBuilder builder = new KStreamBuilder();

    KTable<String, Verkaeufer> umsatzTable = builder.table(Serdes.String(), EventstreamSerde.Verkaeufer(), CommonUtilsConstants.TOPIC_VERKAEUFER_STAMMDATEN);
    KStream<String, String> verkaeuferStream = builder.stream(CommonUtilsConstants.TOPIC_ANZAHL_UMSATZ_PER_VERKAEUFER);

    KStream<String, String> tuttiStream = verkaeuferStream.leftJoin(umsatzTable,
            (tutti, verkaeufer) -> ("Vorname=" + verkaeufer.getVorname().toString() +",Nachname=" +verkaeufer.getNachname().toString() +"," +tutti.toString()), Serdes.String(), Serdes.String());

    tuttiStream.to(Serdes.String(), Serdes.String(), CommonUtilsConstants.TOPIC_TUTTI);

    return new KafkaStreams(builder, streamsConfig);
}

StreamsConfig看起来像这样:

private static Properties getProperties() {
    Properties props = new Properties();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CommonUtilsConstants.BOOTSTRAP_SERVER_CONFIGURATION);
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, CommonUtilsConstants.GID_TUTTI);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "1000");

    return props;
}

完整堆栈跟踪:

22:19:36.550 [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [gid-tutti-8fe6be58-d5c5-41ce-982d-88081b98004e-StreamThread-1] Failed to commit StreamTask 0_0 state: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KTABLE-SOURCE-STATE-STORE-0000000000
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:262)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:190)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:282)
at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:264)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:253)
at org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:815)
at org.apache.kafka.streams.processor.internals.StreamThread.access$2800(StreamThread.java:73)
at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:797)
at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448)
at org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:789)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:778)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: java.lang.NullPointerException: null
at java.lang.String.<init>(String.java:143)
at ch.wesr.eventstream.commonutils.serde.GsonDeserializer.deserialize(GsonDeserializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:163)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:90)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:34)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:78)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:145)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:103)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:97)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:107)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:260)
... 14 common frames omitted

更新:

这是GsonDeserialize的样子

public class GsonDeserializer<T> implements Deserializer<T>{

    public static final String CONFIG_VALUE_CLASS = "default.value.deserializer.class";
    public static final String CONFIG_KEY_CLASS = "default.key.deserializer.class";
    private Class<T> deserializedClass;
    private Gson gson = new GsonBuilder().create();

    public GsonDeserializer() {}

    @Override
    public void configure(Map<String, ?> config, boolean isKey) {
        String configKey = isKey ? CONFIG_KEY_CLASS : CONFIG_VALUE_CLASS;
        String clsName = String.valueOf(config.get(configKey));
        try {
            if (deserializedClass == null) {
                deserializedClass = (Class<T>) Class.forName(clsName);
            }
        } catch (ClassNotFoundException e) {
            System.err.printf("Failed to configure GsonDeserializer. " +
                            "Did you forget to specify the '%s' property ?%n",
                    configKey);
            System.out.println(e.getMessage());
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        return gson.fromJson(new String(bytes), deserializedClass);
    }

    @Override
    public void close() {}
}

推荐答案

只要不刷新缓存,就永远不会调用您的反序列化器.这就是为什么它在一开始就不会失败的原因,您可以通过缓存大小参数和提交间隔(我们在提交时刷新)来增加直到失败的时间.

As long as the cache is not flushed, your deserializer is never called. That's why it doesn't fail in the beginning and you can increase the time until it fails via cache size parameter and commit interval (we flush on commit).

查看您的GsonDeserializer代码,看来new String(bytes)失败并带有NPE-String构造函数无法将null作为参数-反序列化程序代码必须防止bytes==null并应返回直接针对这种情况.

Looking at your code for GsonDeserializer, it seems that new String(bytes) fails with NPE -- String constructor cannot take null as parameter -- your deserializer code must guard against bytes==null and should return null for this case directly.

这篇关于无法刷新状态存储的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆