/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.suppress;

import java.util.Objects;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy;
import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal;
import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

public class KTableSuppressProcessor<K, V>
implements Processor<K, Change<V>> {
    private final long maxRecords;
    private final long maxBytes;
    private final long suppressDurationMillis;
    private final TimeDefinitions.TimeDefinition<K> bufferTimeDefinition;
    private final BufferFullStrategy bufferFullStrategy;
    private final boolean shouldSuppressTombstones;
    private final String storeName;
    private TimeOrderedKeyValueBuffer buffer;
    private InternalProcessorContext internalProcessorContext;
    private Serde<K> keySerde;
    private FullChangeSerde<V> valueSerde;

    public KTableSuppressProcessor(SuppressedInternal<K> suppress, String storeName, Serde<K> keySerde, FullChangeSerde<V> valueSerde) {
        this.storeName = storeName;
        Objects.requireNonNull(suppress);
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.maxRecords = suppress.bufferConfig().maxRecords();
        this.maxBytes = suppress.bufferConfig().maxBytes();
        this.suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
        this.bufferTimeDefinition = suppress.timeDefinition();
        this.bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy();
        this.shouldSuppressTombstones = suppress.shouldSuppressTombstones();
    }

    @Override
    public void init(ProcessorContext context) {
        this.internalProcessorContext = (InternalProcessorContext)context;
        this.keySerde = this.keySerde == null ? context.keySerde() : this.keySerde;
        this.valueSerde = this.valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : this.valueSerde;
        this.buffer = Objects.requireNonNull((TimeOrderedKeyValueBuffer)context.getStateStore(this.storeName));
    }

    @Override
    public void process(K key, Change<V> value) {
        this.buffer(key, value);
        this.enforceConstraints();
    }

    private void buffer(K key, Change<V> value) {
        long bufferTime = this.bufferTimeDefinition.time(this.internalProcessorContext, key);
        ProcessorRecordContext recordContext = this.internalProcessorContext.recordContext();
        Bytes serializedKey = Bytes.wrap((byte[])this.keySerde.serializer().serialize(null, key));
        byte[] serializedValue = this.valueSerde.serializer().serialize(null, value);
        this.buffer.put(bufferTime, serializedKey, new ContextualRecord(serializedValue, recordContext));
    }

    private void enforceConstraints() {
        long streamTime = this.internalProcessorContext.streamTime();
        long expiryTime = streamTime - this.suppressDurationMillis;
        this.buffer.evictWhile(() -> this.buffer.minTimestamp() <= expiryTime, this::emit);
        if (this.overCapacity()) {
            switch (this.bufferFullStrategy) {
                case EMIT: {
                    this.buffer.evictWhile(this::overCapacity, this::emit);
                    return;
                }
                case SHUT_DOWN: {
                    throw new StreamsException(String.format("%s buffer exceeded its max capacity. Currently [%d/%d] records and [%d/%d] bytes.", this.internalProcessorContext.currentNode().name(), this.buffer.numRecords(), this.maxRecords, this.buffer.bufferSize(), this.maxBytes));
                }
            }
        }
    }

    private boolean overCapacity() {
        return (long)this.buffer.numRecords() > this.maxRecords || this.buffer.bufferSize() > this.maxBytes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emit(KeyValue<Bytes, ContextualRecord> toEmit) {
        Change value = (Change)this.valueSerde.deserializer().deserialize(null, ((ContextualRecord)toEmit.value).value());
        if (this.shouldForward(value)) {
            ProcessorRecordContext prevRecordContext = this.internalProcessorContext.recordContext();
            this.internalProcessorContext.setRecordContext(((ContextualRecord)toEmit.value).recordContext());
            try {
                Object key = this.keySerde.deserializer().deserialize(null, ((Bytes)toEmit.key).get());
                this.internalProcessorContext.forward(key, value);
            }
            finally {
                this.internalProcessorContext.setRecordContext(prevRecordContext);
            }
        }
    }

    private boolean shouldForward(Change<V> value) {
        return value.newValue != null || !this.shouldSuppressTombstones;
    }

    @Override
    public void close() {
    }
}

