/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StateManagerStub;
import org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.StreamsTestUtils;

public class InternalMockProcessorContext<KOut, VOut>
extends AbstractProcessorContext<KOut, VOut>
implements RecordCollector.Supplier {
    private StateManager stateManager = new StateManagerStub();
    private final File stateDir;
    private final RecordCollector.Supplier recordCollectorSupplier;
    private final Map<String, StateStore> storeMap = new LinkedHashMap<String, StateStore>();
    private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<String, StateRestoreCallback>();
    private final ToInternal toInternal = new ToInternal();
    private Task.TaskType taskType = Task.TaskType.ACTIVE;
    private Serde<?> keySerde;
    private Serde<?> valueSerde;
    private long timestamp = -1L;
    private final Time time;
    private final Map<String, String> storeToChangelogTopic = new HashMap<String, String>();

    public InternalMockProcessorContext() {
        this(null, null, null, new StreamsMetricsImpl(new Metrics(), "mock", "latest", (Time)new MockTime()), new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()), null, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File stateDir, StreamsConfig config) {
        this(stateDir, null, null, new StreamsMetricsImpl(new Metrics(), "mock", config.getString("built.in.metrics.version"), (Time)new MockTime()), config, null, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(StreamsMetricsImpl streamsMetrics) {
        this(null, null, null, streamsMetrics, new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()), null, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File stateDir, StreamsConfig config, RecordCollector collector) {
        this(stateDir, null, null, new StreamsMetricsImpl(new Metrics(), "mock", config.getString("built.in.metrics.version"), (Time)new MockTime()), config, () -> collector, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File stateDir, Serde<?> keySerde, Serde<?> valueSerde, StreamsConfig config) {
        this(stateDir, keySerde, valueSerde, new StreamsMetricsImpl(new Metrics(), "mock", "latest", (Time)new MockTime()), config, null, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(StateSerdes<?, ?> serdes, RecordCollector collector) {
        this(null, serdes.keySerde(), serdes.valueSerde(), collector, null);
    }

    public InternalMockProcessorContext(StateSerdes<?, ?> serdes, RecordCollector collector, Metrics metrics) {
        this(null, serdes.keySerde(), serdes.valueSerde(), new StreamsMetricsImpl(metrics, "mock", "latest", (Time)new MockTime()), new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()), () -> collector, null, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File stateDir, Serde<?> keySerde, Serde<?> valueSerde, RecordCollector collector, ThreadCache cache) {
        this(stateDir, keySerde, valueSerde, new StreamsMetricsImpl(new Metrics(), "mock", "latest", (Time)new MockTime()), new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()), () -> collector, cache, Time.SYSTEM);
    }

    public InternalMockProcessorContext(File stateDir, Serde<?> keySerde, Serde<?> valueSerde, StreamsMetricsImpl metrics, StreamsConfig config, RecordCollector.Supplier collectorSupplier, ThreadCache cache, Time time) {
        super(new TaskId(0, 0), config, metrics, cache);
        super.setCurrentNode(new ProcessorNode("TESTING_NODE"));
        this.stateDir = stateDir;
        this.keySerde = keySerde;
        this.valueSerde = valueSerde;
        this.recordCollectorSupplier = collectorSupplier;
        this.time = time;
    }

    protected StateManager stateManager() {
        return this.stateManager;
    }

    public void setStateManger(StateManager stateManger) {
        this.stateManager = stateManger;
    }

    public RecordCollector recordCollector() {
        RecordCollector recordCollector = this.recordCollectorSupplier.recordCollector();
        if (recordCollector == null) {
            throw new UnsupportedOperationException("No RecordCollector specified");
        }
        return recordCollector;
    }

    public void setKeySerde(Serde<?> keySerde) {
        this.keySerde = keySerde;
    }

    public void setValueSerde(Serde<?> valueSerde) {
        this.valueSerde = valueSerde;
    }

    public Serde<?> keySerde() {
        return this.keySerde;
    }

    public Serde<?> valueSerde() {
        return this.valueSerde;
    }

    public void initialize() {
    }

    public File stateDir() {
        if (this.stateDir == null) {
            throw new UnsupportedOperationException("State directory not specified");
        }
        return this.stateDir;
    }

    public void register(StateStore store, StateRestoreCallback func) {
        this.storeMap.put(store.name(), store);
        this.restoreFuncs.put(store.name(), func);
        this.stateManager().registerStore(store, func);
    }

    public <S extends StateStore> S getStateStore(String name) {
        return (S)this.storeMap.get(name);
    }

    public Cancellable schedule(Duration interval, PunctuationType type, Punctuator callback) throws IllegalArgumentException {
        throw new UnsupportedOperationException("schedule() not supported.");
    }

    public void commit() {
    }

    public <K extends KOut, V extends VOut> void forward(Record<K, V> record) {
        this.forward(record, null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K extends KOut, V extends VOut> void forward(Record<K, V> record, String childName) {
        if (this.recordContext != null && record.timestamp() != this.recordContext.timestamp()) {
            this.setTime(record.timestamp());
        }
        ProcessorNode thisNode = this.currentNode;
        try {
            Iterator iterator = thisNode.children().iterator();
            while (iterator.hasNext()) {
                ProcessorNode childNode;
                this.currentNode = childNode = (ProcessorNode)iterator.next();
                childNode.process(record);
            }
        }
        finally {
            this.currentNode = thisNode;
        }
    }

    public void forward(Object key, Object value) {
        this.forward(key, value, To.all());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void forward(Object key, Object value, To to) {
        this.toInternal.update(to);
        if (this.toInternal.hasTimestamp()) {
            this.setTime(this.toInternal.timestamp());
        }
        ProcessorNode thisNode = this.currentNode;
        try {
            for (ProcessorNode childNode : thisNode.children()) {
                if (this.toInternal.child() != null && !this.toInternal.child().equals(childNode.name())) continue;
                this.currentNode = childNode;
                Record record = new Record(key, value, this.toInternal.timestamp(), this.headers());
                childNode.process(record);
                this.toInternal.update(to);
            }
        }
        finally {
            this.currentNode = thisNode;
        }
    }

    public void setTime(long timestamp) {
        if (this.recordContext != null) {
            this.recordContext = new ProcessorRecordContext(timestamp, this.recordContext.offset(), this.recordContext.partition(), this.recordContext.topic(), this.recordContext.headers());
        }
        this.timestamp = timestamp;
    }

    public long timestamp() {
        if (this.recordContext == null) {
            return this.timestamp;
        }
        return this.recordContext.timestamp();
    }

    public long currentSystemTimeMs() {
        return this.time.milliseconds();
    }

    public long currentStreamTimeMs() {
        throw new UnsupportedOperationException("this method is not supported in InternalMockProcessorContext");
    }

    public String topic() {
        if (this.recordContext == null) {
            return null;
        }
        return this.recordContext.topic();
    }

    public int partition() {
        if (this.recordContext == null) {
            return -1;
        }
        return this.recordContext.partition();
    }

    public long offset() {
        if (this.recordContext == null) {
            return -1L;
        }
        return this.recordContext.offset();
    }

    public Headers headers() {
        if (this.recordContext == null) {
            return new RecordHeaders();
        }
        return this.recordContext.headers();
    }

    public Task.TaskType taskType() {
        return this.taskType;
    }

    public void logChange(String storeName, Bytes key, byte[] value, long timestamp) {
        this.recordCollector().send(storeName + "-changelog", (Object)key, (Object)value, null, Integer.valueOf(this.taskId().partition()), Long.valueOf(timestamp), (Serializer)BYTES_KEY_SERIALIZER, (Serializer)BYTEARRAY_VALUE_SERIALIZER);
    }

    public void transitionToActive(StreamTask streamTask, RecordCollector recordCollector, ThreadCache newCache) {
        this.taskType = Task.TaskType.ACTIVE;
    }

    public void transitionToStandby(ThreadCache newCache) {
        this.taskType = Task.TaskType.STANDBY;
    }

    public void registerCacheFlushListener(String namespace, ThreadCache.DirtyEntryFlushListener listener) {
        this.cache().addDirtyEntryFlushListener(namespace, listener);
    }

    public void restore(String storeName, Iterable<KeyValue<byte[], byte[]>> changeLog) {
        RecordBatchingStateRestoreCallback restoreCallback = StateRestoreCallbackAdapter.adapt((StateRestoreCallback)this.restoreFuncs.get(storeName));
        ArrayList<ConsumerRecord> records = new ArrayList<ConsumerRecord>();
        for (KeyValue<byte[], byte[]> keyValue : changeLog) {
            records.add(new ConsumerRecord("", 0, 0L, keyValue.key, keyValue.value));
        }
        restoreCallback.restoreBatch(records);
    }

    public void addChangelogForStore(String storeName, String changelogTopic) {
        this.storeToChangelogTopic.put(storeName, changelogTopic);
    }

    public String changelogFor(String storeName) {
        return this.storeToChangelogTopic.get(storeName);
    }
}

