/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
import org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.OwnedTaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutorLocalStateStoresManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
    @GuardedBy(value="lock")
    private final Map<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> taskStateStoresByAllocationID = new HashMap<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>>();
    private final boolean localRecoveryEnabled;
    private final File[] localStateRootDirectories;
    private final Executor discardExecutor;
    private final Object lock;
    private final Thread shutdownHook;
    @GuardedBy(value="lock")
    private boolean closed;

    public TaskExecutorLocalStateStoresManager(boolean localRecoveryEnabled, @Nonnull File[] localStateRootDirectories, @Nonnull Executor discardExecutor) throws IOException {
        this.localRecoveryEnabled = localRecoveryEnabled;
        this.localStateRootDirectories = localStateRootDirectories;
        this.discardExecutor = discardExecutor;
        this.lock = new Object();
        this.closed = false;
        for (File localStateRecoveryRootDir : localStateRootDirectories) {
            if (localStateRecoveryRootDir.exists() || localStateRecoveryRootDir.mkdirs() || localStateRecoveryRootDir.exists()) continue;
            throw new IOException("Could not create root directory for local recovery: " + localStateRecoveryRootDir);
        }
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, (String)this.getClass().getSimpleName(), (Logger)LOG);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    public TaskLocalStateStore localStateStoreForSubtask(@Nonnull JobID jobId, @Nonnull AllocationID allocationID, @Nonnull JobVertexID jobVertexID, @Nonnegative int subtaskIndex) {
        Object object = this.lock;
        synchronized (object) {
            JobVertexSubtaskKey taskKey;
            OwnedTaskLocalStateStore taskLocalStateStore;
            if (this.closed) {
                throw new IllegalStateException("TaskExecutorLocalStateStoresManager is already closed and cannot register a new TaskLocalStateStore.");
            }
            Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> taskStateManagers = this.taskStateStoresByAllocationID.get((Object)allocationID);
            if (taskStateManagers == null) {
                taskStateManagers = new HashMap<JobVertexSubtaskKey, OwnedTaskLocalStateStore>();
                this.taskStateStoresByAllocationID.put(allocationID, taskStateManagers);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Registered new allocation id {} for local state stores for job {}.", (Object)allocationID.toHexString(), (Object)jobId);
                }
            }
            if ((taskLocalStateStore = taskStateManagers.get(taskKey = new JobVertexSubtaskKey(jobVertexID, subtaskIndex))) == null) {
                File[] allocationBaseDirectories = this.allocationBaseDirectories(allocationID);
                LocalRecoveryDirectoryProviderImpl directoryProvider = new LocalRecoveryDirectoryProviderImpl(allocationBaseDirectories, jobId, jobVertexID, subtaskIndex);
                LocalRecoveryConfig localRecoveryConfig = new LocalRecoveryConfig(this.localRecoveryEnabled, directoryProvider);
                taskLocalStateStore = localRecoveryConfig.isLocalRecoveryEnabled() ? new TaskLocalStateStoreImpl(jobId, allocationID, jobVertexID, subtaskIndex, localRecoveryConfig, this.discardExecutor) : new NoOpTaskLocalStateStoreImpl(localRecoveryConfig);
                taskStateManagers.put(taskKey, taskLocalStateStore);
                LOG.debug("Registered new local state store with configuration {} for {} - {} - {} under allocation id {}.", new Object[]{localRecoveryConfig, jobId, jobVertexID, subtaskIndex, allocationID});
            } else {
                LOG.debug("Found existing local state store for {} - {} - {} under allocation id {}: {}", new Object[]{jobId, jobVertexID, subtaskIndex, allocationID, taskLocalStateStore});
            }
            return taskLocalStateStore;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseLocalStateForAllocationId(@Nonnull AllocationID allocationID) {
        Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore> cleanupLocalStores;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Releasing local state under allocation id {}.", (Object)allocationID);
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            cleanupLocalStores = this.taskStateStoresByAllocationID.remove((Object)allocationID);
        }
        if (cleanupLocalStores != null) {
            this.doRelease(cleanupLocalStores.values());
        }
        this.cleanupAllocationBaseDirs(allocationID);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        HashMap<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> toRelease;
        Iterator<Map.Entry<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>>> iterator = this.lock;
        synchronized (iterator) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            toRelease = new HashMap<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>>(this.taskStateStoresByAllocationID);
            this.taskStateStoresByAllocationID.clear();
        }
        ShutdownHookUtil.removeShutdownHook((Thread)this.shutdownHook, (String)this.getClass().getSimpleName(), (Logger)LOG);
        LOG.info("Shutting down TaskExecutorLocalStateStoresManager.");
        for (Map.Entry<AllocationID, Map<JobVertexSubtaskKey, OwnedTaskLocalStateStore>> entry : toRelease.entrySet()) {
            this.doRelease(entry.getValue().values());
            this.cleanupAllocationBaseDirs(entry.getKey());
        }
    }

    @VisibleForTesting
    boolean isLocalRecoveryEnabled() {
        return this.localRecoveryEnabled;
    }

    @VisibleForTesting
    File[] getLocalStateRootDirectories() {
        return this.localStateRootDirectories;
    }

    @VisibleForTesting
    String allocationSubDirString(AllocationID allocationID) {
        return "aid_" + allocationID.toHexString();
    }

    private File[] allocationBaseDirectories(AllocationID allocationID) {
        String allocationSubDirString = this.allocationSubDirString(allocationID);
        File[] allocationDirectories = new File[this.localStateRootDirectories.length];
        for (int i = 0; i < this.localStateRootDirectories.length; ++i) {
            allocationDirectories[i] = new File(this.localStateRootDirectories[i], allocationSubDirString);
        }
        return allocationDirectories;
    }

    private void doRelease(Iterable<OwnedTaskLocalStateStore> toRelease) {
        if (toRelease != null) {
            for (OwnedTaskLocalStateStore stateStore : toRelease) {
                try {
                    stateStore.dispose();
                }
                catch (Exception disposeEx) {
                    LOG.warn("Exception while disposing local state store {}.", (Object)stateStore, (Object)disposeEx);
                }
            }
        }
    }

    private void cleanupAllocationBaseDirs(AllocationID allocationID) {
        File[] allocationDirectories;
        for (File directory : allocationDirectories = this.allocationBaseDirectories(allocationID)) {
            try {
                FileUtils.deleteFileOrDirectory((File)directory);
            }
            catch (IOException e) {
                LOG.warn("Exception while deleting local state directory for allocation id {}.", (Object)allocationID, (Object)e);
            }
        }
    }

    private static final class JobVertexSubtaskKey {
        @Nonnull
        final JobVertexID jobVertexID;
        @Nonnegative
        final int subtaskIndex;

        JobVertexSubtaskKey(@Nonnull JobVertexID jobVertexID, @Nonnegative int subtaskIndex) {
            this.jobVertexID = jobVertexID;
            this.subtaskIndex = subtaskIndex;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            JobVertexSubtaskKey that = (JobVertexSubtaskKey)o;
            return this.subtaskIndex == that.subtaskIndex && this.jobVertexID.equals(that.jobVertexID);
        }

        public int hashCode() {
            int result = this.jobVertexID.hashCode();
            result = 31 * result + this.subtaskIndex;
            return result;
        }
    }
}

