/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessorStateManager
implements StateManager {
    private static final Logger log = LoggerFactory.getLogger(ProcessorStateManager.class);
    public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
    private final File baseDir;
    private final TaskId taskId;
    private final String logPrefix;
    private final boolean isStandby;
    private final StateDirectory stateDirectory;
    private final Map<String, StateStore> stores;
    private final Map<String, StateStore> globalStores;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final Map<TopicPartition, Long> offsetLimits;
    private final Map<TopicPartition, Long> restoredOffsets;
    private final Map<TopicPartition, Long> checkpointedOffsets;
    private final Map<String, StateRestoreCallback> restoreCallbacks;
    private final Map<String, String> storeToChangelogTopic;
    private final Map<String, TopicPartition> partitionForTopic;
    private final OffsetCheckpoint checkpoint;

    public ProcessorStateManager(TaskId taskId, Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby, StateDirectory stateDirectory, Map<String, String> storeToChangelogTopic) throws LockException, IOException {
        this.taskId = taskId;
        this.stateDirectory = stateDirectory;
        this.logPrefix = String.format("task [%s]", taskId);
        this.partitionForTopic = new HashMap<String, TopicPartition>();
        for (TopicPartition source : sources) {
            this.partitionForTopic.put(source.topic(), source);
        }
        this.stores = new LinkedHashMap<String, StateStore>();
        this.globalStores = new HashMap<String, StateStore>();
        this.restoreConsumer = restoreConsumer;
        this.offsetLimits = new HashMap<TopicPartition, Long>();
        this.restoredOffsets = new HashMap<TopicPartition, Long>();
        this.isStandby = isStandby;
        this.restoreCallbacks = isStandby ? new HashMap() : null;
        this.storeToChangelogTopic = storeToChangelogTopic;
        if (!stateDirectory.lock(taskId, 5)) {
            throw new LockException(String.format("%s Failed to lock the state directory for task %s", this.logPrefix, taskId));
        }
        try {
            this.baseDir = stateDirectory.directoryForTask(taskId);
        }
        catch (ProcessorStateException e) {
            throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s", new Object[]{this.logPrefix, taskId, e}));
        }
        this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
        this.checkpointedOffsets = new HashMap<TopicPartition, Long>(this.checkpoint.read());
    }

    public static String storeChangelogTopic(String applicationId, String storeName) {
        return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
    }

    @Override
    public File baseDir() {
        return this.baseDir;
    }

    @Override
    public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
        log.debug("{} Registering state store {} to its state manager", (Object)this.logPrefix, (Object)store.name());
        if (store.name().equals(CHECKPOINT_FILE_NAME)) {
            throw new IllegalArgumentException(String.format("%s Illegal store name: %s", this.logPrefix, CHECKPOINT_FILE_NAME));
        }
        if (this.stores.containsKey(store.name())) {
            throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", this.logPrefix, store.name()));
        }
        String topic = this.storeToChangelogTopic.get(store.name());
        if (topic == null) {
            this.stores.put(store.name(), store);
            return;
        }
        int partition = this.getPartition(topic);
        boolean partitionNotFound = true;
        long startTime = System.currentTimeMillis();
        long waitTime = 5000L;
        block4: do {
            List partitions;
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException e) {
                // empty catch block
            }
            try {
                partitions = this.restoreConsumer.partitionsFor(topic);
            }
            catch (TimeoutException e) {
                throw new StreamsException(String.format("%s Could not fetch partition info for topic: %s before expiration of the configured request timeout", this.logPrefix, topic));
            }
            if (partitions == null) {
                throw new StreamsException(String.format("%s Could not find partition info for topic: %s", this.logPrefix, topic));
            }
            for (PartitionInfo partitionInfo : partitions) {
                if (partitionInfo.partition() != partition) continue;
                partitionNotFound = false;
                continue block4;
            }
        } while (partitionNotFound && System.currentTimeMillis() < startTime + waitTime);
        if (partitionNotFound) {
            throw new StreamsException(String.format("%s Store %s's change log (%s) does not contain partition %s", this.logPrefix, store.name(), topic, partition));
        }
        if (this.isStandby) {
            if (store.persistent()) {
                log.trace("{} Preparing standby replica of persistent state store {} with changelog topic {}", new Object[]{this.logPrefix, store.name(), topic});
                this.restoreCallbacks.put(topic, stateRestoreCallback);
            }
        } else {
            log.trace("{} Restoring state store {} from changelog topic {}", new Object[]{this.logPrefix, store.name(), topic});
            this.restoreActiveState(topic, stateRestoreCallback);
        }
        this.stores.put(store.name(), store);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreActiveState(String topicName, StateRestoreCallback stateRestoreCallback) {
        if (!this.restoreConsumer.subscription().isEmpty()) {
            throw new IllegalStateException(String.format("%s Restore consumer should have not subscribed to any partitions (%s) beforehand", this.logPrefix, this.restoreConsumer.subscription()));
        }
        TopicPartition storePartition = new TopicPartition(topicName, this.getPartition(topicName));
        this.restoreConsumer.assign(Collections.singletonList(storePartition));
        try {
            long limit;
            block8: {
                this.restoreConsumer.seekToEnd(Collections.singleton(storePartition));
                long endOffset = this.restoreConsumer.position(storePartition);
                if (this.checkpointedOffsets.containsKey(storePartition)) {
                    this.restoreConsumer.seek(storePartition, this.checkpointedOffsets.get(storePartition).longValue());
                } else {
                    this.restoreConsumer.seekToBeginning(Collections.singleton(storePartition));
                }
                log.debug("restoring partition {} from offset {} to endOffset {}", new Object[]{storePartition, this.restoreConsumer.position(storePartition), endOffset});
                limit = this.offsetLimit(storePartition);
                do {
                    ConsumerRecord record;
                    long offset = 0L;
                    Iterator i$ = this.restoreConsumer.poll(100L).records(storePartition).iterator();
                    while (i$.hasNext() && (offset = (record = (ConsumerRecord)i$.next()).offset()) < limit) {
                        stateRestoreCallback.restore((byte[])record.key(), (byte[])record.value());
                    }
                    if (offset >= limit || this.restoreConsumer.position(storePartition) == endOffset) break block8;
                } while (this.restoreConsumer.position(storePartition) <= endOffset);
                throw new IllegalStateException(String.format("%s Log end offset of %s should not change while restoring: old end offset %d, current offset %d", this.logPrefix, storePartition, endOffset, this.restoreConsumer.position(storePartition)));
            }
            long newOffset = Math.min(limit, this.restoreConsumer.position(storePartition));
            this.restoredOffsets.put(storePartition, newOffset);
        }
        finally {
            this.restoreConsumer.assign(Collections.emptyList());
        }
    }

    @Override
    public Map<TopicPartition, Long> checkpointed() {
        HashMap<TopicPartition, Long> partitionsAndOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<String, StateRestoreCallback> entry : this.restoreCallbacks.entrySet()) {
            int partition;
            String topicName = entry.getKey();
            TopicPartition storePartition = new TopicPartition(topicName, partition = this.getPartition(topicName));
            if (this.checkpointedOffsets.containsKey(storePartition)) {
                partitionsAndOffsets.put(storePartition, this.checkpointedOffsets.get(storePartition));
                continue;
            }
            partitionsAndOffsets.put(storePartition, -1L);
        }
        return partitionsAndOffsets;
    }

    public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition storePartition, List<ConsumerRecord<byte[], byte[]>> records) {
        long limit = this.offsetLimit(storePartition);
        ArrayList<ConsumerRecord<byte[], byte[]>> remainingRecords = null;
        StateRestoreCallback restoreCallback = this.restoreCallbacks.get(storePartition.topic());
        long lastOffset = -1L;
        int count = 0;
        for (ConsumerRecord<byte[], byte[]> record : records) {
            if (record.offset() < limit) {
                try {
                    restoreCallback.restore((byte[])record.key(), (byte[])record.value());
                }
                catch (Exception e) {
                    throw new ProcessorStateException(String.format("%s exception caught while trying to restore state from %s", this.logPrefix, storePartition), e);
                }
                lastOffset = record.offset();
            } else {
                if (remainingRecords == null) {
                    remainingRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>(records.size() - count);
                }
                remainingRecords.add(record);
            }
            ++count;
        }
        this.restoredOffsets.put(storePartition, lastOffset + 1L);
        return remainingRecords;
    }

    public void putOffsetLimit(TopicPartition partition, long limit) {
        this.offsetLimits.put(partition, limit);
    }

    private long offsetLimit(TopicPartition partition) {
        Long limit = this.offsetLimits.get(partition);
        return limit != null ? limit : Long.MAX_VALUE;
    }

    @Override
    public StateStore getStore(String name) {
        return this.stores.get(name);
    }

    @Override
    public void flush(InternalProcessorContext context) {
        if (!this.stores.isEmpty()) {
            log.debug("{} Flushing all stores registered in the state manager", (Object)this.logPrefix);
            for (StateStore store : this.stores.values()) {
                try {
                    log.trace("{} Flushing store={}", (Object)this.logPrefix, (Object)store.name());
                    store.flush();
                }
                catch (Exception e) {
                    throw new ProcessorStateException(String.format("%s Failed to flush state store %s", this.logPrefix, store.name()), e);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
        try {
            if (!this.stores.isEmpty()) {
                log.debug("{} Closing its state manager and all the registered state stores", (Object)this.logPrefix);
                for (Map.Entry<String, StateStore> entry : this.stores.entrySet()) {
                    log.debug("{} Closing storage engine {}", (Object)this.logPrefix, (Object)entry.getKey());
                    try {
                        entry.getValue().close();
                    }
                    catch (Exception e) {
                        throw new ProcessorStateException(String.format("%s Failed to close state store %s", this.logPrefix, entry.getKey()), e);
                    }
                }
                if (ackedOffsets != null) {
                    this.checkpoint(ackedOffsets);
                }
            }
        }
        finally {
            this.stateDirectory.unlock(this.taskId);
        }
    }

    @Override
    public void checkpoint(Map<TopicPartition, Long> ackedOffsets) {
        for (String storeName : this.stores.keySet()) {
            if (!this.stores.get(storeName).persistent() || !this.storeToChangelogTopic.containsKey(storeName)) continue;
            String changelogTopic = this.storeToChangelogTopic.get(storeName);
            TopicPartition topicPartition = new TopicPartition(changelogTopic, this.getPartition(storeName));
            if (ackedOffsets.containsKey(topicPartition)) {
                this.checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1L);
                continue;
            }
            if (!this.restoredOffsets.containsKey(topicPartition)) continue;
            this.checkpointedOffsets.put(topicPartition, this.restoredOffsets.get(topicPartition));
        }
        try {
            this.checkpoint.write(this.checkpointedOffsets);
        }
        catch (IOException e) {
            log.warn("Failed to write checkpoint file to {}", (Object)new File(this.baseDir, CHECKPOINT_FILE_NAME), (Object)e);
        }
    }

    private int getPartition(String topic) {
        TopicPartition partition = this.partitionForTopic.get(topic);
        return partition == null ? this.taskId.partition : partition.partition();
    }

    void registerGlobalStateStores(List<StateStore> stateStores) {
        for (StateStore stateStore : stateStores) {
            this.globalStores.put(stateStore.name(), stateStore);
        }
    }

    @Override
    public StateStore getGlobalStore(String name) {
        return this.globalStores.get(name);
    }
}

