/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.apache.kafka.streams.state.internals.RecordConverters;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.slf4j.Logger;

abstract class AbstractStateManager
implements StateManager {
    static final String CHECKPOINT_FILE_NAME = ".checkpoint";
    final File baseDir;
    final boolean eosEnabled;
    OffsetCheckpoint checkpoint;
    final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<TopicPartition, Long>();
    final Map<String, StateStore> stores = new LinkedHashMap<String, StateStore>();
    final Map<String, StateStore> globalStores = new LinkedHashMap<String, StateStore>();

    AbstractStateManager(File baseDir, boolean eosEnabled) {
        this.baseDir = baseDir;
        this.eosEnabled = eosEnabled;
        this.checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
    }

    static RecordConverter converterForStore(StateStore store) {
        return WrappedStateStore.isTimestamped(store) ? RecordConverters.rawValueToTimestampedValue() : RecordConverters.identity();
    }

    public void reinitializeStateStoresForPartitions(Logger log, Map<String, StateStore> stateStores, Map<String, String> storeToChangelogTopic, Collection<TopicPartition> partitions, InternalProcessorContext processorContext) {
        Map<String, String> changelogTopicToStore = this.inverseOneToOneMap(storeToChangelogTopic);
        HashSet<String> storeToBeReinitialized = new HashSet<String>();
        HashMap<String, StateStore> storesCopy = new HashMap<String, StateStore>(stateStores);
        for (TopicPartition topicPartition : partitions) {
            this.checkpointableOffsets.remove(topicPartition);
            storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
        }
        if (!this.eosEnabled) {
            try {
                this.checkpoint.write(this.checkpointableOffsets);
            }
            catch (IOException fatalException) {
                log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", new Object[]{this.checkpoint, stateStores, fatalException});
                throw new StreamsException("Failed to reinitialize global store.", fatalException);
            }
        }
        for (Map.Entry entry : storesCopy.entrySet()) {
            StateStore stateStore = (StateStore)entry.getValue();
            String storeName = stateStore.name();
            if (!storeToBeReinitialized.contains(storeName)) continue;
            try {
                stateStore.close();
            }
            catch (RuntimeException runtimeException) {
                // empty catch block
            }
            processorContext.uninitialize();
            stateStores.remove(entry.getKey());
            try {
                Utils.delete((File)new File(this.baseDir + File.separator + "rocksdb" + File.separator + storeName));
            }
            catch (IOException fatalException) {
                log.error("Failed to reinitialize store {}.", (Object)storeName, (Object)fatalException);
                throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException);
            }
            try {
                Utils.delete((File)new File(this.baseDir + File.separator + storeName));
            }
            catch (IOException fatalException) {
                log.error("Failed to reinitialize store {}.", (Object)storeName, (Object)fatalException);
                throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException);
            }
            stateStore.init(processorContext, stateStore);
        }
    }

    private Map<String, String> inverseOneToOneMap(Map<String, String> origin) {
        HashMap<String, String> reversedMap = new HashMap<String, String>();
        for (Map.Entry<String, String> entry : origin.entrySet()) {
            reversedMap.put(entry.getValue(), entry.getKey());
        }
        return reversedMap;
    }
}

