/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StateRestorer;
import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;

public class ProcessorStateManager
implements StateManager {
    private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    static final String CHECKPOINT_FILE_NAME = ".checkpoint";
    private final Logger log;
    private final File baseDir;
    private final TaskId taskId;
    private final String logPrefix;
    private final boolean isStandby;
    private final ChangelogReader changelogReader;
    private final Map<String, StateStore> stores;
    private final Map<String, StateStore> globalStores;
    private final Map<TopicPartition, Long> offsetLimits;
    private final Map<TopicPartition, Long> checkpointedOffsets;
    private final Map<TopicPartition, Long> standbyRestoredOffsets;
    private final Map<String, StateRestoreCallback> restoreCallbacks;
    private final Map<String, String> storeToChangelogTopic;
    private final List<TopicPartition> changelogPartitions = new ArrayList<TopicPartition>();
    private final Map<String, TopicPartition> partitionForTopic;
    private OffsetCheckpoint checkpoint;

    public ProcessorStateManager(TaskId taskId, Collection<TopicPartition> sources, boolean isStandby, StateDirectory stateDirectory, Map<String, String> storeToChangelogTopic, ChangelogReader changelogReader, boolean eosEnabled, LogContext logContext) throws IOException {
        this.taskId = taskId;
        this.changelogReader = changelogReader;
        this.logPrefix = String.format("task [%s] ", taskId);
        this.log = logContext.logger(this.getClass());
        this.partitionForTopic = new HashMap<String, TopicPartition>();
        for (TopicPartition source : sources) {
            this.partitionForTopic.put(source.topic(), source);
        }
        this.stores = new LinkedHashMap<String, StateStore>();
        this.globalStores = new HashMap<String, StateStore>();
        this.offsetLimits = new HashMap<TopicPartition, Long>();
        this.standbyRestoredOffsets = new HashMap<TopicPartition, Long>();
        this.isStandby = isStandby;
        this.restoreCallbacks = isStandby ? new HashMap() : null;
        this.storeToChangelogTopic = storeToChangelogTopic;
        this.baseDir = stateDirectory.directoryForTask(taskId);
        this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
        this.checkpointedOffsets = new HashMap<TopicPartition, Long>(this.checkpoint.read());
        if (eosEnabled) {
            this.checkpoint.delete();
            this.checkpoint = null;
        }
        this.log.debug("Created state store manager for task {} with the acquired state dir lock", (Object)taskId);
    }

    public static String storeChangelogTopic(String applicationId, String storeName) {
        return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    @Override
    public File baseDir() {
        return this.baseDir;
    }

    @Override
    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
        this.log.debug("Registering state store {} to its state manager", (Object)store.name());
        if (store.name().equals(CHECKPOINT_FILE_NAME)) {
            throw new IllegalArgumentException(String.format("%sIllegal store name: %s", this.logPrefix, CHECKPOINT_FILE_NAME));
        }
        if (this.stores.containsKey(store.name())) {
            throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", this.logPrefix, store.name()));
        }
        String topic = this.storeToChangelogTopic.get(store.name());
        if (topic == null) {
            this.stores.put(store.name(), store);
            return;
        }
        TopicPartition storePartition = new TopicPartition(topic, this.getPartition(topic));
        if (this.isStandby) {
            this.log.trace("Preparing standby replica of  state store {} with changelog topic {}", (Object)store.name(), (Object)topic);
            this.restoreCallbacks.put(topic, stateRestoreCallback);
        } else {
            this.log.trace("Restoring state store {} from changelog topic {}", (Object)store.name(), (Object)topic);
            StateRestorer restorer = new StateRestorer(storePartition, new CompositeRestoreListener(stateRestoreCallback), this.checkpointedOffsets.get(storePartition), this.offsetLimit(storePartition), store.persistent(), store.name());
            this.changelogReader.register(restorer);
        }
        this.changelogPartitions.add(storePartition);
        this.stores.put(store.name(), store);
    }

    @Override
    public Map<TopicPartition, Long> checkpointed() {
        HashMap<TopicPartition, Long> partitionsAndOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<String, StateRestoreCallback> entry : this.restoreCallbacks.entrySet()) {
            int partition;
            String topicName = entry.getKey();
            TopicPartition storePartition = new TopicPartition(topicName, partition = this.getPartition(topicName));
            if (this.checkpointedOffsets.containsKey(storePartition)) {
                partitionsAndOffsets.put(storePartition, this.checkpointedOffsets.get(storePartition));
                continue;
            }
            partitionsAndOffsets.put(storePartition, -1L);
        }
        return partitionsAndOffsets;
    }

    List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
        long limit = this.offsetLimit(storePartition);
        ArrayList<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
        ArrayList<KeyValue<byte[], byte[]>> restoreRecords = new ArrayList<KeyValue<byte[], byte[]>>();
        BatchingStateRestoreCallback restoreCallback = this.getBatchingRestoreCallback(this.restoreCallbacks.get(storePartition.topic()));
        long lastOffset = -1L;
        int count = 0;
        for (ConsumerRecord<byte[], byte[]> record : records) {
            if (record.offset() < limit) {
                restoreRecords.add(KeyValue.pair(record.key(), record.value()));
                lastOffset = record.offset();
            } else {
                if (remainingRecords == null) {
                    remainingRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>(records.size() - count);
                }
                remainingRecords.add(record);
            }
            ++count;
        }
        if (!restoreRecords.isEmpty()) {
            try {
                restoreCallback.restoreAll(restoreRecords);
            }
            catch (Exception e) {
                throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", this.logPrefix, storePartition), e);
            }
        }
        this.standbyRestoredOffsets.put(storePartition, lastOffset + 1L);
        return remainingRecords;
    }

    void putOffsetLimit(TopicPartition partition, long limit) {
        this.log.trace("Updating store offset limit for partition {} to {}", (Object)partition, (Object)limit);
        this.offsetLimits.put(partition, limit);
    }

    private long offsetLimit(TopicPartition partition) {
        Long limit = this.offsetLimits.get(partition);
        return limit != null ? limit : Long.MAX_VALUE;
    }

    @Override
    public StateStore getStore(String name) {
        return this.stores.get(name);
    }

    @Override
    public void flush() {
        ProcessorStateException firstException = null;
        if (!this.stores.isEmpty()) {
            this.log.debug("Flushing all stores registered in the state manager");
            for (StateStore store : this.stores.values()) {
                this.log.trace("Flushing store {}", (Object)store.name());
                try {
                    store.flush();
                }
                catch (Exception e) {
                    if (firstException == null) {
                        firstException = new ProcessorStateException(String.format("%sFailed to flush state store %s", this.logPrefix, store.name()), e);
                    }
                    this.log.error("Failed to flush state store {}: ", (Object)store.name(), (Object)e);
                }
            }
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    @Override
    public void close(Map<TopicPartition, Long> ackedOffsets) throws ProcessorStateException {
        ProcessorStateException firstException = null;
        if (!this.stores.isEmpty()) {
            this.log.debug("Closing its state manager and all the registered state stores");
            for (StateStore store : this.stores.values()) {
                this.log.debug("Closing storage engine {}", (Object)store.name());
                try {
                    store.close();
                }
                catch (Exception e) {
                    if (firstException == null) {
                        firstException = new ProcessorStateException(String.format("%sFailed to close state store %s", this.logPrefix, store.name()), e);
                    }
                    this.log.error("Failed to close state store {}: ", (Object)store.name(), (Object)e);
                }
            }
            if (ackedOffsets != null) {
                this.checkpoint(ackedOffsets);
            }
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    @Override
    public void checkpoint(Map<TopicPartition, Long> checkpointableOffsets) {
        this.checkpointedOffsets.putAll(this.changelogReader.restoredOffsets());
        for (StateStore store : this.stores.values()) {
            String storeName = store.name();
            if (!store.persistent() || !this.storeToChangelogTopic.containsKey(storeName)) continue;
            String changelogTopic = this.storeToChangelogTopic.get(storeName);
            TopicPartition topicPartition = new TopicPartition(changelogTopic, this.getPartition(storeName));
            if (checkpointableOffsets.containsKey(topicPartition)) {
                this.checkpointedOffsets.put(topicPartition, checkpointableOffsets.get(topicPartition) + 1L);
                continue;
            }
            if (!this.standbyRestoredOffsets.containsKey(topicPartition)) continue;
            this.checkpointedOffsets.put(topicPartition, this.standbyRestoredOffsets.get(topicPartition));
        }
        if (this.checkpoint == null) {
            this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
        }
        try {
            this.checkpoint.write(this.checkpointedOffsets);
        }
        catch (IOException e) {
            this.log.warn("Failed to write checkpoint file to {}:", (Object)new File(this.baseDir, CHECKPOINT_FILE_NAME), (Object)e);
        }
    }

    private int getPartition(String topic) {
        TopicPartition partition = this.partitionForTopic.get(topic);
        return partition == null ? this.taskId.partition : partition.partition();
    }

    void registerGlobalStateStores(List<StateStore> stateStores) {
        this.log.debug("Register global stores {}", stateStores);
        for (StateStore stateStore : stateStores) {
            this.globalStores.put(stateStore.name(), stateStore);
        }
    }

    @Override
    public StateStore getGlobalStore(String name) {
        return this.globalStores.get(name);
    }

    private BatchingStateRestoreCallback getBatchingRestoreCallback(StateRestoreCallback callback) {
        if (callback instanceof BatchingStateRestoreCallback) {
            return (BatchingStateRestoreCallback)callback;
        }
        return new WrappedBatchingStateRestoreCallback(callback);
    }

    Collection<TopicPartition> changelogPartitions() {
        return this.changelogPartitions;
    }
}

