/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStore;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStoreUtil;
import org.apache.flink.runtime.jobmanager.ExecutionPlanStoreWatcher;
import org.apache.flink.runtime.persistence.ResourceVersion;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultExecutionPlanStore<R extends ResourceVersion<R>>
implements ExecutionPlanStore,
ExecutionPlanStore.ExecutionPlanListener {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionPlanStore.class);
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final Set<JobID> addedExecutionPlans = new HashSet<JobID>();
    private final StateHandleStore<ExecutionPlan, R> executionPlanStateHandleStore;
    @GuardedBy(value="lock")
    private final ExecutionPlanStoreWatcher executionPlanStoreWatcher;
    private final ExecutionPlanStoreUtil executionPlanStoreUtil;
    @GuardedBy(value="lock")
    private ExecutionPlanStore.ExecutionPlanListener executionPlanListener;
    @GuardedBy(value="lock")
    private volatile boolean running;

    public DefaultExecutionPlanStore(StateHandleStore<ExecutionPlan, R> stateHandleStore, ExecutionPlanStoreWatcher executionPlanStoreWatcher, ExecutionPlanStoreUtil executionPlanStoreUtil) {
        this.executionPlanStateHandleStore = (StateHandleStore)Preconditions.checkNotNull(stateHandleStore);
        this.executionPlanStoreWatcher = (ExecutionPlanStoreWatcher)Preconditions.checkNotNull((Object)executionPlanStoreWatcher);
        this.executionPlanStoreUtil = (ExecutionPlanStoreUtil)Preconditions.checkNotNull((Object)executionPlanStoreUtil);
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(ExecutionPlanStore.ExecutionPlanListener executionPlanListener) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (!this.running) {
                this.executionPlanListener = (ExecutionPlanStore.ExecutionPlanListener)Preconditions.checkNotNull((Object)executionPlanListener);
                this.executionPlanStoreWatcher.start(this);
                this.running = true;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                this.running = false;
                LOG.info("Stopping DefaultExecutionPlanStore.");
                Exception exception = null;
                try {
                    this.executionPlanStateHandleStore.releaseAll();
                }
                catch (Exception e) {
                    exception = e;
                }
                try {
                    this.executionPlanStoreWatcher.stop();
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
                }
                if (exception != null) {
                    throw new FlinkException("Could not properly stop the DefaultExecutionPlanStore.", (Throwable)exception);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    @Nullable
    public ExecutionPlan recoverExecutionPlan(JobID jobId) throws Exception {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        LOG.debug("Recovering execution plan {} from {}.", (Object)jobId, this.executionPlanStateHandleStore);
        String name = this.executionPlanStoreUtil.jobIDToName(jobId);
        Object object = this.lock;
        synchronized (object) {
            ExecutionPlan executionPlan;
            boolean success;
            block13: {
                RetrievableStateHandle<ExecutionPlan> executionPlanRetrievableStateHandle;
                this.verifyIsRunning();
                success = false;
                try {
                    executionPlanRetrievableStateHandle = this.executionPlanStateHandleStore.getAndLock(name);
                }
                catch (StateHandleStore.NotExistException ignored) {
                    success = true;
                    ExecutionPlan executionPlan2 = null;
                    if (success) return executionPlan2;
                    this.executionPlanStateHandleStore.release(name);
                    return executionPlan2;
                }
                catch (Exception e) {
                    throw new FlinkException("Could not retrieve the submitted execution plan state handle for " + name + " from the submitted execution plan store.", (Throwable)e);
                }
                executionPlan = executionPlanRetrievableStateHandle.retrieveState();
                break block13;
                catch (ClassNotFoundException cnfe) {
                    throw new FlinkException("Could not retrieve submitted ExecutionPlan from state handle under " + name + ". This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", (Throwable)cnfe);
                }
                catch (IOException ioe) {
                    throw new FlinkException("Could not retrieve submitted ExecutionPlan from state handle under " + name + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", (Throwable)ioe);
                }
            }
            this.addedExecutionPlans.add(executionPlan.getJobID());
            LOG.info("Recovered {}.", (Object)executionPlan);
            success = true;
            return executionPlan;
            finally {
                if (!success) {
                    this.executionPlanStateHandleStore.release(name);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putExecutionPlan(ExecutionPlan executionPlan) throws Exception {
        Preconditions.checkNotNull((Object)executionPlan, (String)"Execution Plan");
        JobID jobID = executionPlan.getJobID();
        String name = this.executionPlanStoreUtil.jobIDToName(jobID);
        LOG.debug("Adding execution plan {} to {}.", (Object)jobID, this.executionPlanStateHandleStore);
        boolean success = false;
        while (!success) {
            Object object = this.lock;
            synchronized (object) {
                this.verifyIsRunning();
                R currentVersion = this.executionPlanStateHandleStore.exists(name);
                if (!currentVersion.isExisting()) {
                    try {
                        this.executionPlanStateHandleStore.addAndLock(name, executionPlan);
                        this.addedExecutionPlans.add(jobID);
                        success = true;
                    }
                    catch (StateHandleStore.AlreadyExistException ignored) {
                        LOG.warn("{} already exists in {}.", (Object)executionPlan, this.executionPlanStateHandleStore);
                    }
                } else if (this.addedExecutionPlans.contains(jobID)) {
                    try {
                        this.executionPlanStateHandleStore.replace(name, currentVersion, executionPlan);
                        LOG.info("Updated {} in {}.", (Object)executionPlan, (Object)this.getClass().getSimpleName());
                        success = true;
                    }
                    catch (StateHandleStore.NotExistException ignored) {
                        LOG.warn("{} does not exists in {}.", (Object)executionPlan, this.executionPlanStateHandleStore);
                    }
                } else {
                    throw new IllegalStateException("Trying to update an execution plan you didn't #getAllSubmittedExecutionPlans() or #putExecutionPlan() yourself before.");
                }
            }
        }
        LOG.info("Added {} to {}.", (Object)executionPlan, this.executionPlanStateHandleStore);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putJobResourceRequirements(JobID jobId, JobResourceRequirements jobResourceRequirements) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            ExecutionPlan executionPlan = this.recoverExecutionPlan(jobId);
            if (executionPlan == null) {
                throw new NoSuchElementException(String.format("ExecutionPlan for job [%s] was not found in ExecutionPlanStore and is needed for attaching JobResourceRequirements.", jobId));
            }
            JobResourceRequirements.writeToExecutionPlan(executionPlan, jobResourceRequirements);
            this.putExecutionPlan(executionPlan);
        }
    }

    @Override
    public CompletableFuture<Void> globalCleanupAsync(JobID jobId, Executor executor) {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        return this.runAsyncWithLockAssertRunning((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            LOG.debug("Removing execution plan {} from {}.", (Object)jobId, this.executionPlanStateHandleStore);
            String name = this.executionPlanStoreUtil.jobIDToName(jobId);
            this.releaseAndRemoveOrThrowCompletionException(jobId, name);
            this.addedExecutionPlans.remove(jobId);
            LOG.info("Removed execution plan {} from {}.", (Object)jobId, this.executionPlanStateHandleStore);
        }), executor);
    }

    @GuardedBy(value="lock")
    private void releaseAndRemoveOrThrowCompletionException(JobID jobId, String jobName) {
        boolean success;
        try {
            success = this.executionPlanStateHandleStore.releaseAndTryRemove(jobName);
        }
        catch (Exception e) {
            throw new CompletionException(e);
        }
        if (!success) {
            throw new CompletionException(new FlinkException(String.format("Could not remove execution plan with job id %s from %s.", jobId, this.executionPlanStateHandleStore)));
        }
    }

    @Override
    public CompletableFuture<Void> localCleanupAsync(JobID jobId, Executor executor) {
        Preconditions.checkNotNull((Object)jobId, (String)"Job ID");
        return this.runAsyncWithLockAssertRunning((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> {
            LOG.debug("Releasing execution plan {} from {}.", (Object)jobId, this.executionPlanStateHandleStore);
            this.executionPlanStateHandleStore.release(this.executionPlanStoreUtil.jobIDToName(jobId));
            this.addedExecutionPlans.remove(jobId);
            LOG.info("Released execution plan {} from {}.", (Object)jobId, this.executionPlanStateHandleStore);
        }), executor);
    }

    private CompletableFuture<Void> runAsyncWithLockAssertRunning(ThrowingRunnable<Exception> runnable, Executor executor) {
        return CompletableFuture.runAsync(() -> {
            Object object = this.lock;
            synchronized (object) {
                this.verifyIsRunning();
                try {
                    runnable.run();
                }
                catch (Exception e) {
                    throw new CompletionException(e);
                }
            }
        }, executor);
    }

    @Override
    public Collection<JobID> getJobIds() throws Exception {
        Collection<String> names;
        LOG.debug("Retrieving all stored job ids from {}.", this.executionPlanStateHandleStore);
        try {
            names = this.executionPlanStateHandleStore.getAllHandles();
        }
        catch (Exception e) {
            throw new Exception("Failed to retrieve all job ids from " + String.valueOf(this.executionPlanStateHandleStore) + ".", e);
        }
        ArrayList<JobID> jobIds = new ArrayList<JobID>(names.size());
        for (String name : names) {
            try {
                jobIds.add(this.executionPlanStoreUtil.nameToJobID(name));
            }
            catch (Exception exception) {
                LOG.warn("Could not parse job id from {}. This indicates a malformed name.", (Object)name, (Object)exception);
            }
        }
        LOG.info("Retrieved job ids {} from {}", jobIds, this.executionPlanStateHandleStore);
        return jobIds;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onAddedExecutionPlan(JobID jobId) {
        Object object = this.lock;
        synchronized (object) {
            if (this.running && !this.addedExecutionPlans.contains(jobId)) {
                try {
                    this.executionPlanListener.onAddedExecutionPlan(jobId);
                }
                catch (Throwable t) {
                    LOG.error("Failed to notify execution plan listener onAddedExecutionPlan event for {}", (Object)jobId, (Object)t);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onRemovedExecutionPlan(JobID jobId) {
        Object object = this.lock;
        synchronized (object) {
            if (this.running && this.addedExecutionPlans.contains(jobId)) {
                try {
                    this.executionPlanListener.onRemovedExecutionPlan(jobId);
                }
                catch (Throwable t) {
                    LOG.error("Failed to notify execution plan listener onRemovedExecutionPlan event for {}", (Object)jobId, (Object)t);
                }
            }
        }
    }

    private void verifyIsRunning() {
        Preconditions.checkState((boolean)this.running, (Object)"Not running. Forgot to call start()?");
    }
}

