/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.io.File;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.junit.rules.ExternalResource;

@Deprecated
public class KStreamTestDriver
extends ExternalResource {
    private static final long DEFAULT_CACHE_SIZE_BYTES = 0x100000L;
    private ProcessorTopology topology;
    private InternalMockProcessorContext context;
    private ProcessorTopology globalTopology;
    private final LogContext logContext = new LogContext("testCache ");

    public void setUp(StreamsBuilder builder) {
        this.setUp(builder, null, Serdes.ByteArray(), Serdes.ByteArray());
    }

    public void setUp(StreamsBuilder builder, File stateDir) {
        this.setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray());
    }

    public void setUp(StreamsBuilder builder, File stateDir, long cacheSize) {
        this.setUp(builder, stateDir, Serdes.ByteArray(), Serdes.ByteArray(), cacheSize);
    }

    public void setUp(StreamsBuilder builder, File stateDir, Serde<?> keySerde, Serde<?> valSerde) {
        this.setUp(builder, stateDir, keySerde, valSerde, 0x100000L);
    }

    public void setUp(StreamsBuilder builder, File stateDir, Serde<?> keySerde, Serde<?> valSerde, long cacheSize) {
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
        internalTopologyBuilder.setApplicationId("TestDriver");
        this.topology = internalTopologyBuilder.build(null);
        this.globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
        ThreadCache cache = new ThreadCache(this.logContext, cacheSize, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        this.context = new InternalMockProcessorContext(stateDir, keySerde, valSerde, (RecordCollector)new MockRecordCollector(), cache);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "topic", null));
        if (this.globalTopology != null) {
            this.initTopology(this.globalTopology, this.globalTopology.globalStateStores());
        }
        this.initTopology(this.topology, this.topology.stateStores());
    }

    protected void after() {
        if (this.topology != null) {
            this.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initTopology(ProcessorTopology topology, List<StateStore> stores) {
        for (StateStore store : stores) {
            try {
                store.init((ProcessorContext)this.context, store);
            }
            catch (RuntimeException e) {
                new RuntimeException("Fatal exception initializing store.", e).printStackTrace();
                throw e;
            }
        }
        for (ProcessorNode node : topology.processors()) {
            this.context.setCurrentNode(node);
            try {
                node.init((InternalProcessorContext)this.context);
            }
            finally {
                this.context.setCurrentNode(null);
            }
        }
    }

    public ProcessorTopology topology() {
        return this.topology;
    }

    public ProcessorContext context() {
        return this.context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void process(String topicName, Object key, Object value) {
        ProcessorNode prevNode = this.context.currentNode();
        ProcessorNode currNode = this.sourceNodeByTopicName(topicName);
        if (currNode != null) {
            this.context.setRecordContext(this.createRecordContext(topicName, this.context.timestamp()));
            this.context.setCurrentNode(currNode);
            try {
                this.context.forward(key, value);
            }
            finally {
                this.context.setCurrentNode(prevNode);
            }
        }
    }

    private ProcessorNode sourceNodeByTopicName(String topicName) {
        SourceNode topicNode = this.topology.source(topicName);
        if (topicNode == null) {
            for (String sourceTopic : this.topology.sourceTopics()) {
                if (!Pattern.compile(sourceTopic).matcher(topicName).matches()) continue;
                return this.topology.source(sourceTopic);
            }
            if (this.globalTopology != null) {
                topicNode = this.globalTopology.source(topicName);
            }
        }
        return topicNode;
    }

    public void setTime(long timestamp) {
        this.context.setTime(timestamp);
    }

    public void close() {
        for (ProcessorNode node : this.topology.processors()) {
            this.context.setCurrentNode(node);
            try {
                node.close();
            }
            finally {
                this.context.setCurrentNode(null);
            }
        }
        this.closeState();
    }

    public Set<String> allProcessorNames() {
        HashSet<String> names = new HashSet<String>();
        List nodes = this.topology.processors();
        for (ProcessorNode node : nodes) {
            names.add(node.name());
        }
        return names;
    }

    public ProcessorNode processor(String name) {
        List nodes = this.topology.processors();
        for (ProcessorNode node : nodes) {
            if (!node.name().equals(name)) continue;
            return node;
        }
        return null;
    }

    public Map<String, StateStore> allStateStores() {
        return this.context.allStateStores();
    }

    public void flushState() {
        for (StateStore stateStore : this.context.allStateStores().values()) {
            stateStore.flush();
        }
    }

    private void closeState() {
        this.flushState();
        for (StateStore stateStore : this.context.allStateStores().values()) {
            stateStore.close();
        }
    }

    private ProcessorRecordContext createRecordContext(String topicName, long timestamp) {
        return new ProcessorRecordContext(timestamp, -1L, -1, topicName, null);
    }

    private class MockRecordCollector
    extends RecordCollectorImpl {
        MockRecordCollector() {
            super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        }

        public <K, V> void send(String topic, K key, V value, Headers headers, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<? super K, ? super V> partitioner) {
            if (KStreamTestDriver.this.sourceNodeByTopicName(topic) != null) {
                KStreamTestDriver.this.process(topic, key, value);
            }
        }

        public <K, V> void send(String topic, K key, V value, Headers headers, Integer partition, Long timestamp, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
            if (KStreamTestDriver.this.sourceNodeByTopicName(topic) != null) {
                KStreamTestDriver.this.process(topic, key, value);
            }
        }

        public void flush() {
        }

        public void close() {
        }
    }
}

