/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.StoreChangeLogger;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

class ChangeLoggingWindowBytesStore
extends WrappedStateStore.AbstractStateStore
implements WindowStore<Bytes, byte[]> {
    private final WindowStore<Bytes, byte[]> bytesStore;
    private final boolean retainDuplicates;
    private StoreChangeLogger<Bytes, byte[]> changeLogger;
    private ProcessorContext context;
    private int seqnum = 0;

    ChangeLoggingWindowBytesStore(WindowStore<Bytes, byte[]> bytesStore, boolean retainDuplicates) {
        super(bytesStore);
        this.bytesStore = bytesStore;
        this.retainDuplicates = retainDuplicates;
    }

    @Override
    public byte[] fetch(Bytes key, long timestamp) {
        return (byte[])this.bytesStore.fetch(key, timestamp);
    }

    @Override
    public WindowStoreIterator<byte[]> fetch(Bytes key, long from, long to) {
        return this.bytesStore.fetch(key, from, to);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
        return this.bytesStore.fetch(keyFrom, keyTo, from, to);
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        return this.bytesStore.all();
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long timeFrom, long timeTo) {
        return this.bytesStore.fetchAll(timeFrom, timeTo);
    }

    @Override
    public void put(Bytes key, byte[] value) {
        this.put(key, value, this.context.timestamp());
    }

    @Override
    public void put(Bytes key, byte[] value, long timestamp) {
        this.bytesStore.put(key, value, timestamp);
        this.changeLogger.logChange(WindowKeySchema.toStoreKeyBinary(key, timestamp, this.maybeUpdateSeqnumForDups()), value);
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.context = context;
        this.bytesStore.init(context, root);
        String topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), this.bytesStore.name());
        this.changeLogger = new StoreChangeLogger(this.name(), context, new StateSerdes(topic, Serdes.Bytes(), Serdes.ByteArray()));
    }

    private int maybeUpdateSeqnumForDups() {
        if (this.retainDuplicates) {
            this.seqnum = this.seqnum + 1 & Integer.MAX_VALUE;
        }
        return this.seqnum;
    }
}

