/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.GlobalStateManager;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GlobalStateManagerImpl
implements GlobalStateManager {
    private static final int MAX_LOCK_ATTEMPTS = 5;
    private static final Logger log = LoggerFactory.getLogger(GlobalStateManagerImpl.class);
    private final ProcessorTopology topology;
    private final Consumer<byte[], byte[]> consumer;
    private final StateDirectory stateDirectory;
    private final Map<String, StateStore> stores = new LinkedHashMap<String, StateStore>();
    private final File baseDir;
    private final OffsetCheckpoint checkpoint;
    private final Set<String> globalStoreNames = new HashSet<String>();
    private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<TopicPartition, Long>();
    private final Set<String> globalNonPersistentStoresTopics = new HashSet<String>();

    public GlobalStateManagerImpl(ProcessorTopology topology, Consumer<byte[], byte[]> consumer, StateDirectory stateDirectory) {
        this.topology = topology;
        this.consumer = consumer;
        this.stateDirectory = stateDirectory;
        this.baseDir = stateDirectory.globalStateDir();
        this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, ".checkpoint"));
        Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
        for (StateStore store : topology.globalStateStores()) {
            if (store.persistent()) continue;
            this.globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
        }
    }

    @Override
    public Set<String> initialize(InternalProcessorContext processorContext) {
        try {
            if (!this.stateDirectory.lockGlobalState(5)) {
                throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir));
            }
        }
        catch (IOException e) {
            throw new LockException(String.format("Failed to lock the global state directory: %s", this.baseDir));
        }
        try {
            this.checkpointableOffsets.putAll(this.checkpoint.read());
        }
        catch (IOException e) {
            try {
                this.stateDirectory.unlockGlobalState();
            }
            catch (IOException e1) {
                log.error("failed to unlock the global state directory", (Throwable)e);
            }
            throw new StreamsException("Failed to read checkpoints for global state stores", e);
        }
        List<StateStore> stateStores = this.topology.globalStateStores();
        for (StateStore stateStore : stateStores) {
            this.globalStoreNames.add(stateStore.name());
            stateStore.init(processorContext, stateStore);
        }
        return Collections.unmodifiableSet(this.globalStoreNames);
    }

    @Override
    public StateStore getGlobalStore(String name) {
        return this.stores.get(name);
    }

    @Override
    public StateStore getStore(String name) {
        return this.getGlobalStore(name);
    }

    @Override
    public File baseDir() {
        return this.baseDir;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void register(StateStore store, boolean ignored, StateRestoreCallback stateRestoreCallback) {
        if (this.stores.containsKey(store.name())) {
            throw new IllegalArgumentException(String.format("Global Store %s has already been registered", store.name()));
        }
        if (!this.globalStoreNames.contains(store.name())) {
            throw new IllegalArgumentException(String.format("Trying to register store %s that is not a known global store", store.name()));
        }
        if (stateRestoreCallback == null) {
            throw new IllegalArgumentException(String.format("The stateRestoreCallback provided for store %s was null", store.name()));
        }
        log.info("restoring state for global store {}", (Object)store.name());
        List<TopicPartition> topicPartitions = this.topicPartitionsForStore(store);
        this.consumer.assign(topicPartitions);
        Map highWatermarks = this.consumer.endOffsets(topicPartitions);
        try {
            this.restoreState(stateRestoreCallback, topicPartitions, highWatermarks);
            this.stores.put(store.name(), store);
        }
        finally {
            this.consumer.assign(Collections.emptyList());
        }
    }

    private List<TopicPartition> topicPartitionsForStore(StateStore store) {
        String sourceTopic = this.topology.storeToChangelogTopic().get(store.name());
        List partitionInfos = this.consumer.partitionsFor(sourceTopic);
        if (partitionInfos == null || partitionInfos.isEmpty()) {
            throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));
        }
        ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
        for (PartitionInfo partition : partitionInfos) {
            topicPartitions.add(new TopicPartition(partition.topic(), partition.partition()));
        }
        return topicPartitions;
    }

    private void restoreState(StateRestoreCallback stateRestoreCallback, List<TopicPartition> topicPartitions, Map<TopicPartition, Long> highWatermarks) {
        for (TopicPartition topicPartition : topicPartitions) {
            this.consumer.assign(Collections.singletonList(topicPartition));
            Long checkpoint = this.checkpointableOffsets.get(topicPartition);
            if (checkpoint != null) {
                this.consumer.seek(topicPartition, checkpoint.longValue());
            } else {
                this.consumer.seekToBeginning(Collections.singletonList(topicPartition));
            }
            long offset = this.consumer.position(topicPartition);
            Long highWatermark = highWatermarks.get(topicPartition);
            while (offset < highWatermark) {
                ConsumerRecords records = this.consumer.poll(100L);
                for (ConsumerRecord record : records) {
                    offset = record.offset() + 1L;
                    stateRestoreCallback.restore((byte[])record.key(), (byte[])record.value());
                }
            }
            this.checkpointableOffsets.put(topicPartition, offset);
        }
    }

    @Override
    public void flush(InternalProcessorContext context) {
        log.debug("Flushing all global stores registered in the state manager");
        for (StateStore store : this.stores.values()) {
            try {
                log.trace("Flushing global store={}", (Object)store.name());
                store.flush();
            }
            catch (Exception e) {
                throw new ProcessorStateException(String.format("Failed to flush global state store %s", store.name()), e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(Map<TopicPartition, Long> offsets) throws IOException {
        try {
            if (this.stores.isEmpty()) {
                return;
            }
            StringBuilder closeFailed = new StringBuilder();
            for (Map.Entry<String, StateStore> entry : this.stores.entrySet()) {
                log.debug("Closing global storage engine {}", (Object)entry.getKey());
                try {
                    entry.getValue().close();
                }
                catch (Exception e) {
                    log.error("Failed to close global state store {}", (Object)entry.getKey(), (Object)e);
                    closeFailed.append("Failed to close global state store:").append(entry.getKey()).append(". Reason: ").append(e.getMessage()).append("\n");
                }
            }
            this.stores.clear();
            if (closeFailed.length() > 0) {
                throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed);
            }
            this.checkpoint(offsets);
        }
        finally {
            this.stateDirectory.unlockGlobalState();
        }
    }

    @Override
    public void checkpoint(Map<TopicPartition, Long> offsets) {
        this.checkpointableOffsets.putAll(offsets);
        HashMap<TopicPartition, Long> filteredOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, Long> topicPartitionOffset : this.checkpointableOffsets.entrySet()) {
            String topic = topicPartitionOffset.getKey().topic();
            if (this.globalNonPersistentStoresTopics.contains(topic)) continue;
            filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
        }
        if (!filteredOffsets.isEmpty()) {
            try {
                this.checkpoint.write(filteredOffsets);
            }
            catch (IOException e) {
                log.warn("failed to write offsets checkpoint for global stores", (Throwable)e);
            }
        }
    }

    @Override
    public Map<TopicPartition, Long> checkpointed() {
        return Collections.unmodifiableMap(this.checkpointableOffsets);
    }
}

