/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.StreamsTestUtils;

public class MockProcessorContext
extends AbstractProcessorContext
implements RecordCollector.Supplier {
    private final File stateDir;
    private final Metrics metrics;
    private final Serde<?> keySerde;
    private final Serde<?> valSerde;
    private final RecordCollector.Supplier recordCollectorSupplier;
    private final Map<String, StateStore> storeMap = new LinkedHashMap<String, StateStore>();
    private final Map<String, StateRestoreCallback> restoreFuncs = new HashMap<String, StateRestoreCallback>();
    private long timestamp = -1L;

    public MockProcessorContext(File stateDir, StreamsConfig config) {
        this(stateDir, null, null, new Metrics(), config, null, null);
    }

    public MockProcessorContext(StateSerdes<?, ?> serdes, RecordCollector collector) {
        this(null, serdes.keySerde(), serdes.valueSerde(), collector, null);
    }

    public MockProcessorContext(StateSerdes<?, ?> serdes, final RecordCollector collector, Metrics metrics) {
        this(null, serdes.keySerde(), serdes.valueSerde(), metrics, new StreamsConfig((Map)StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier(){

            public RecordCollector recordCollector() {
                return collector;
            }
        }, null);
    }

    public MockProcessorContext(File stateDir, Serde<?> keySerde, Serde<?> valSerde, final RecordCollector collector, ThreadCache cache) {
        this(stateDir, keySerde, valSerde, new Metrics(), new StreamsConfig((Map)StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier(){

            public RecordCollector recordCollector() {
                return collector;
            }
        }, cache);
    }

    private MockProcessorContext(File stateDir, Serde<?> keySerde, Serde<?> valSerde, Metrics metrics, StreamsConfig config, RecordCollector.Supplier collectorSupplier, ThreadCache cache) {
        super(new TaskId(0, 0), config.getString("application.id"), config, (StreamsMetrics)new MockStreamsMetrics(metrics), null, cache);
        this.stateDir = stateDir;
        this.keySerde = keySerde;
        this.valSerde = valSerde;
        this.metrics = metrics;
        this.recordCollectorSupplier = collectorSupplier;
    }

    public RecordCollector recordCollector() {
        RecordCollector recordCollector = this.recordCollectorSupplier.recordCollector();
        if (recordCollector == null) {
            throw new UnsupportedOperationException("No RecordCollector specified");
        }
        return recordCollector;
    }

    public Serde<?> keySerde() {
        return this.keySerde;
    }

    public Serde<?> valueSerde() {
        return this.valSerde;
    }

    public void initialized() {
    }

    public File stateDir() {
        if (this.stateDir == null) {
            throw new UnsupportedOperationException("State directory not specified");
        }
        return this.stateDir;
    }

    public void register(StateStore store, boolean deprecatedAndIgnoredLoggingEnabled, StateRestoreCallback func) {
        this.storeMap.put(store.name(), store);
        this.restoreFuncs.put(store.name(), func);
    }

    public StateStore getStateStore(String name) {
        return this.storeMap.get(name);
    }

    public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
        throw new UnsupportedOperationException("schedule() not supported.");
    }

    public void schedule(long interval) {
        throw new UnsupportedOperationException("schedule() not supported.");
    }

    public void commit() {
        throw new UnsupportedOperationException("commit() not supported.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> void forward(K key, V value) {
        ProcessorNode thisNode = this.currentNode;
        Iterator i$ = thisNode.children().iterator();
        while (i$.hasNext()) {
            ProcessorNode childNode;
            this.currentNode = childNode = (ProcessorNode)i$.next();
            try {
                childNode.process(key, value);
            }
            finally {
                this.currentNode = thisNode;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> void forward(K key, V value, int childIndex) {
        ProcessorNode childNode;
        ProcessorNode thisNode = this.currentNode;
        this.currentNode = childNode = (ProcessorNode)thisNode.children().get(childIndex);
        try {
            childNode.process(key, value);
        }
        finally {
            this.currentNode = thisNode;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> void forward(K key, V value, String childName) {
        ProcessorNode thisNode = this.currentNode;
        for (ProcessorNode childNode : thisNode.children()) {
            if (!childNode.name().equals(childName)) continue;
            this.currentNode = childNode;
            try {
                childNode.process(key, value);
                break;
            }
            finally {
                this.currentNode = thisNode;
            }
        }
    }

    public void setTime(long timestamp) {
        if (this.recordContext != null) {
            this.recordContext = new ProcessorRecordContext(timestamp, this.recordContext.offset(), this.recordContext.partition(), this.recordContext.topic());
        }
        this.timestamp = timestamp;
    }

    public long timestamp() {
        if (this.recordContext == null) {
            return this.timestamp;
        }
        return this.recordContext.timestamp();
    }

    public String topic() {
        if (this.recordContext == null) {
            return null;
        }
        return this.recordContext.topic();
    }

    public int partition() {
        if (this.recordContext == null) {
            return -1;
        }
        return this.recordContext.partition();
    }

    public long offset() {
        if (this.recordContext == null) {
            return -1L;
        }
        return this.recordContext.offset();
    }

    Map<String, StateStore> allStateStores() {
        return Collections.unmodifiableMap(this.storeMap);
    }

    public void restore(String storeName, Iterable<KeyValue<byte[], byte[]>> changeLog) {
        BatchingStateRestoreCallback restoreCallback = this.getBatchingRestoreCallback(this.restoreFuncs.get(storeName));
        StateRestoreListener restoreListener = this.getStateRestoreListener((StateRestoreCallback)restoreCallback);
        restoreListener.onRestoreStart(null, storeName, 0L, 0L);
        ArrayList<KeyValue<byte[], byte[]>> records = new ArrayList<KeyValue<byte[], byte[]>>();
        for (KeyValue<byte[], byte[]> keyValue : changeLog) {
            records.add(keyValue);
        }
        restoreCallback.restoreAll(records);
        restoreListener.onRestoreEnd(null, storeName, 0L);
    }

    public void close() {
        this.metrics.close();
    }

    private StateRestoreListener getStateRestoreListener(StateRestoreCallback restoreCallback) {
        if (restoreCallback instanceof StateRestoreListener) {
            return (StateRestoreListener)restoreCallback;
        }
        return CompositeRestoreListener.NO_OP_STATE_RESTORE_LISTENER;
    }

    private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback restoreCallback) {
        if (restoreCallback instanceof BatchingStateRestoreCallback) {
            return (BatchingStateRestoreCallback)restoreCallback;
        }
        return new WrappedBatchingStateRestoreCallback(restoreCallback);
    }
}

