/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.TestUtils;

public class ProcessorTopologyTestDriver {
    private static final String APPLICATION_ID = "test-driver-application";
    private static final int PARTITION_ID = 0;
    private static final TaskId TASK_ID = new TaskId(0, 0);
    private final ProcessorTopology topology;
    private final MockProducer<byte[], byte[]> producer;
    private final Map<String, TopicPartition> partitionsByTopic = new HashMap<String, TopicPartition>();
    private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<TopicPartition, AtomicLong>();
    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<String, Queue<ProducerRecord<byte[], byte[]>>>();
    private final Set<String> internalTopics = new HashSet<String>();
    private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<String, TopicPartition>();
    private StreamTask task;
    private GlobalStateUpdateTask globalStateTask;

    public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder) {
        this.topology = builder.setApplicationId(APPLICATION_ID).build(null);
        ProcessorTopology globalTopology = builder.buildGlobalStateTopology();
        MockConsumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        ByteArraySerializer bytesSerializer = new ByteArraySerializer();
        this.producer = new MockProducer<byte[], byte[]>(true, (Serializer)bytesSerializer, (Serializer)bytesSerializer){

            public List<PartitionInfo> partitionsFor(String topic) {
                return Collections.singletonList(new PartitionInfo(topic, 0, null, null, null));
            }
        };
        for (TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) {
            this.internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
        }
        for (String topic : this.topology.sourceTopics()) {
            TopicPartition tp = new TopicPartition(topic, 0);
            this.partitionsByTopic.put(topic, tp);
            this.offsetsByTopicPartition.put(tp, new AtomicLong());
        }
        consumer.assign(this.offsetsByTopicPartition.keySet());
        StateDirectory stateDirectory = new StateDirectory(APPLICATION_ID, TestUtils.tempDirectory().getPath(), Time.SYSTEM);
        MockStreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
        ThreadCache cache = new ThreadCache("mock", 0x100000L, (StreamsMetrics)streamsMetrics);
        if (globalTopology != null) {
            MockConsumer<byte[], byte[]> globalConsumer = this.createGlobalConsumer();
            for (String topicName : globalTopology.sourceTopics()) {
                ArrayList<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>();
                partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null));
                globalConsumer.updatePartitions(topicName, partitionInfos);
                TopicPartition partition = new TopicPartition(topicName, 1);
                globalConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L));
                this.globalPartitionsByTopic.put(topicName, partition);
                this.offsetsByTopicPartition.put(partition, new AtomicLong());
            }
            GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory);
            this.globalStateTask = new GlobalStateUpdateTask(globalTopology, (InternalProcessorContext)new GlobalProcessorContextImpl(config, (StateManager)stateManager, (StreamsMetrics)streamsMetrics, cache), (GlobalStateManager)stateManager);
            this.globalStateTask.initialize();
        }
        if (!this.partitionsByTopic.isEmpty()) {
            this.task = new StreamTask(TASK_ID, APPLICATION_ID, this.partitionsByTopic.values(), this.topology, (Consumer)consumer, (ChangelogReader)new StoreChangelogReader(this.createRestoreConsumer(this.topology.storeToChangelogTopic())), config, (StreamsMetrics)streamsMetrics, stateDirectory, cache, (Time)new MockTime(), this.producer);
            this.task.initializeStateStores();
            this.task.initializeTopology();
        }
    }

    private void process(String topicName, byte[] key, byte[] value, long timestamp) {
        TopicPartition tp = this.partitionsByTopic.get(topicName);
        if (tp != null) {
            long offset = this.offsetsByTopicPartition.get(tp).incrementAndGet();
            this.task.addRecords(tp, this.records((ConsumerRecord<byte[], byte[]>)new ConsumerRecord(tp.topic(), tp.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)key, (Object)value)));
            this.producer.clear();
            this.task.process();
            ((InternalProcessorContext)this.task.context()).setRecordContext((RecordContext)new ProcessorRecordContext(timestamp, offset, tp.partition(), topicName));
            this.task.commit();
            for (ProducerRecord record : this.producer.history()) {
                Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.outputRecordsByTopic.get(record.topic());
                if (outputRecords == null) {
                    outputRecords = new LinkedList<ProducerRecord<byte[], byte[]>>();
                    this.outputRecordsByTopic.put(record.topic(), outputRecords);
                }
                outputRecords.add((ProducerRecord<byte[], byte[]>)record);
                if (!this.internalTopics.contains(record.topic()) && !this.topology.sourceTopics().contains(record.topic())) continue;
                this.process(record.topic(), (byte[])record.key(), (byte[])record.value(), record.timestamp());
            }
        } else {
            TopicPartition global = this.globalPartitionsByTopic.get(topicName);
            if (global == null) {
                throw new IllegalArgumentException("Unexpected topic: " + topicName);
            }
            long offset = this.offsetsByTopicPartition.get(global).incrementAndGet();
            this.globalStateTask.update(new ConsumerRecord(global.topic(), global.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)key, (Object)value));
            this.globalStateTask.flushState();
        }
    }

    public void process(String topicName, byte[] key, byte[] value) {
        this.process(topicName, key, value, 0L);
    }

    public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        this.process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value));
    }

    public ProducerRecord<byte[], byte[]> readOutput(String topic) {
        Queue<ProducerRecord<byte[], byte[]>> outputRecords = this.outputRecordsByTopic.get(topic);
        if (outputRecords == null) {
            return null;
        }
        return outputRecords.poll();
    }

    public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        ProducerRecord<byte[], byte[]> record = this.readOutput(topic);
        if (record == null) {
            return null;
        }
        Object key = keyDeserializer.deserialize(record.topic(), (byte[])record.key());
        Object value = valueDeserializer.deserialize(record.topic(), (byte[])record.value());
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), key, value);
    }

    private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> record) {
        return Collections.singleton(record);
    }

    private StateStore getStateStore(String name) {
        return ((ProcessorContextImpl)this.task.context()).getStateMgr().getStore(name);
    }

    public <K, V> KeyValueStore<K, V> getKeyValueStore(String name) {
        StateStore store = this.getStateStore(name);
        return store instanceof KeyValueStore ? (KeyValueStore)this.getStateStore(name) : null;
    }

    public void close() {
        if (this.task != null) {
            this.task.close(true, false);
        }
        if (this.globalStateTask != null) {
            try {
                this.globalStateTask.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    private MockConsumer<byte[], byte[]> createRestoreConsumer(Map<String, String> storeToChangelogTopic) {
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST){

            public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
            }

            public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
            }

            public synchronized long position(TopicPartition partition) {
                return 0L;
            }
        };
        for (Map.Entry<String, String> storeAndTopic : storeToChangelogTopic.entrySet()) {
            String topicName = storeAndTopic.getValue();
            ArrayList<PartitionInfo> partitionInfos = new ArrayList<PartitionInfo>();
            partitionInfos.add(new PartitionInfo(topicName, 0, null, null, null));
            consumer.updatePartitions(topicName, partitionInfos);
            consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, 0), 0L));
        }
        return consumer;
    }

    private MockConsumer<byte[], byte[]> createGlobalConsumer() {
        return new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST){

            public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
            }

            public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
            }

            public synchronized long position(TopicPartition partition) {
                return 0L;
            }
        };
    }
}

