/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.CacheFlushListener;

class TimestampedCacheFlushListener<KOut, VOut>
implements CacheFlushListener<KOut, ValueAndTimestamp<VOut>> {
    private final InternalProcessorContext<KOut, Change<VOut>> context;
    private final ProcessorNode myNode;

    TimestampedCacheFlushListener(ProcessorContext<KOut, Change<VOut>> context) {
        this.context = (InternalProcessorContext)context;
        this.myNode = this.context.currentNode();
    }

    @Override
    public void apply(Record<KOut, Change<ValueAndTimestamp<VOut>>> record) {
        ProcessorNode<?, ?, ?, ?> prev = this.context.currentNode();
        this.context.setCurrentNode(this.myNode);
        try {
            this.context.forward(record.withValue(new Change(ValueAndTimestamp.getValueOrNull((ValueAndTimestamp)record.value().newValue), ValueAndTimestamp.getValueOrNull((ValueAndTimestamp)record.value().oldValue))).withTimestamp(record.value().newValue != null ? ((ValueAndTimestamp)record.value().newValue).timestamp() : record.timestamp()));
        }
        finally {
            this.context.setCurrentNode(prev);
        }
    }
}

