/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import akka.actor.ActorSystem;
import akka.actor.Props;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.SavepointCoordinatorDeActivator;
import org.apache.flink.runtime.checkpoint.StateForTask;
import org.apache.flink.runtime.checkpoint.StateStore;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.impl.Promise;

public class SavepointCoordinator
extends CheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointCoordinator.class);
    private StateStore<CompletedCheckpoint> savepointStore;
    private final Map<Long, Promise<String>> savepointPromises;
    private volatile String savepointRestorePath;

    public SavepointCoordinator(JobID jobId, long baseInterval, long checkpointTimeout, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, ClassLoader userClassLoader, CheckpointIDCounter checkpointIDCounter, StateStore<CompletedCheckpoint> savepointStore, CheckpointStatsTracker statsTracker) throws Exception {
        super(jobId, baseInterval, checkpointTimeout, 0L, Integer.MAX_VALUE, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, userClassLoader, checkpointIDCounter, IgnoreCompletedCheckpointsStore.INSTANCE, RecoveryMode.STANDALONE, statsTracker);
        this.savepointStore = Preconditions.checkNotNull(savepointStore);
        this.savepointPromises = new ConcurrentHashMap<Long, Promise<String>>();
    }

    public String getSavepointRestorePath() {
        return this.savepointRestorePath;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Future<String> triggerSavepoint(long timestamp) throws Exception {
        Promise.DefaultPromise promise;
        block8: {
            promise = new Promise.DefaultPromise();
            try {
                long checkpointId = this.getAndIncrementCheckpointId();
                if (checkpointId == -1L) {
                    throw new IllegalStateException("Failed to get checkpoint Id");
                }
                if (this.savepointPromises.put(checkpointId, (Promise<String>)promise) == null) {
                    boolean success = false;
                    try {
                        success = this.triggerCheckpoint(timestamp, checkpointId);
                        break block8;
                    }
                    finally {
                        if (!success) {
                            this.savepointPromises.remove(checkpointId);
                            promise.failure((Throwable)new Exception("Failed to trigger savepoint"));
                        }
                    }
                }
                throw new IllegalStateException("Duplicate checkpoint ID");
            }
            catch (Throwable t) {
                promise.failure((Throwable)new Exception("Failed to trigger savepoint", t));
            }
        }
        return promise.future();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restoreSavepoint(Map<JobVertexID, ExecutionJobVertex> tasks, String savepointPath) throws Exception {
        Preconditions.checkNotNull(savepointPath, "Savepoint path");
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown()) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            long recoveryTimestamp = System.currentTimeMillis();
            LOG.info("Rolling back to savepoint '{}'.", (Object)savepointPath);
            CompletedCheckpoint checkpoint = this.savepointStore.getState(savepointPath);
            LOG.info("Savepoint: {}@{}", (Object)checkpoint.getCheckpointID(), (Object)checkpoint.getTimestamp());
            LOG.debug("Rolling back individual operators.");
            for (StateForTask state : checkpoint.getStates()) {
                String msg;
                LOG.debug("Rolling back subtask {} of operator {}.", (Object)state.getSubtask(), (Object)state.getOperatorId());
                ExecutionJobVertex vertex = tasks.get((Object)state.getOperatorId());
                if (vertex == null) {
                    msg = String.format("Failed to rollback to savepoint %s. Cannot map old state for task %s to the new program. This indicates that the program has been changed in a non-compatible way  after the savepoint.", new Object[]{checkpoint, state.getOperatorId()});
                    throw new IllegalStateException(msg);
                }
                if (state.getSubtask() >= vertex.getParallelism()) {
                    msg = String.format("Failed to rollback to savepoint %s. Parallelism mismatch between savepoint state and new program. Cannot map subtask %d of operator %s to new program with parallelism %d. This indicates that the program has been changed in a non-compatible way after the savepoint.", new Object[]{checkpoint, state.getSubtask(), state.getOperatorId(), vertex.getParallelism()});
                    throw new IllegalStateException(msg);
                }
                Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
                exec.setInitialState(state.getState(), recoveryTimestamp);
            }
            long nextCheckpointId = checkpoint.getCheckpointID();
            this.checkpointIdCounter.start();
            this.checkpointIdCounter.setCount(nextCheckpointId + 1L);
            LOG.info("Reset the checkpoint ID to {}", (Object)nextCheckpointId);
            if (this.savepointRestorePath == null) {
                this.savepointRestorePath = savepointPath;
            }
        }
    }

    @Override
    protected void onShutdown() {
        for (Promise<String> promise : this.savepointPromises.values()) {
            promise.failure((Throwable)new Exception("Checkpoint coordinator shutdown"));
        }
        this.savepointPromises.clear();
    }

    @Override
    protected void onCancelCheckpoint(long canceledCheckpointId) {
        Promise<String> promise = this.savepointPromises.remove(canceledCheckpointId);
        if (promise != null) {
            promise.failure((Throwable)new Exception("Savepoint expired before completing"));
        }
    }

    @Override
    protected void onFullyAcknowledgedCheckpoint(CompletedCheckpoint checkpoint) {
        Promise<String> promise = Preconditions.checkNotNull(this.savepointPromises.remove(checkpoint.getCheckpointID()));
        if (promise.isCompleted()) {
            throw new IllegalStateException("Savepoint promise completed");
        }
        try {
            String savepointPath = this.savepointStore.putState(checkpoint);
            promise.success((Object)savepointPath);
        }
        catch (Exception e) {
            LOG.warn("Failed to store savepoint.", (Throwable)e);
            promise.failure((Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown()) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (this.getJobStatusListener() == null) {
                Props props = Props.create(SavepointCoordinatorDeActivator.class, (Object[])new Object[]{this, leaderSessionID});
                this.setJobStatusListener(new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID));
            }
            return this.getJobStatusListener();
        }
    }

    private static class IgnoreCompletedCheckpointsStore
    implements CompletedCheckpointStore {
        private static final CompletedCheckpointStore INSTANCE = new IgnoreCompletedCheckpointsStore();

        private IgnoreCompletedCheckpointsStore() {
        }

        @Override
        public void recover() throws Exception {
        }

        @Override
        public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
        }

        @Override
        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
            return null;
        }

        @Override
        public void discardAllCheckpoints() throws Exception {
        }

        @Override
        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            return Collections.emptyList();
        }

        @Override
        public int getNumberOfRetainedCheckpoints() {
            return 0;
        }
    }
}

