/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorDeActivator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
    private final Object lock = new Object();
    private final JobID job;
    private final CheckpointProperties checkpointProperties;
    private final Executor executor;
    private final ExecutionVertex[] tasksToTrigger;
    private final ExecutionVertex[] tasksToWaitFor;
    private final ExecutionVertex[] tasksToCommitTo;
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final CheckpointStorageCoordinatorView checkpointStorage;
    private final ArrayDeque<Long> recentPendingCheckpoints;
    private final CheckpointIDCounter checkpointIdCounter;
    private final long baseInterval;
    private final long checkpointTimeout;
    private final long minPauseBetweenCheckpoints;
    private final int maxConcurrentCheckpointAttempts;
    private final ScheduledExecutor timer;
    private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;
    private JobStatusListener jobStatusListener;
    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
    private ScheduledFuture<?> currentPeriodicTrigger;
    private long lastCheckpointCompletionRelativeTime;
    private boolean periodicScheduling;
    private boolean triggerRequestQueued;
    private volatile boolean shutdown;
    @Nullable
    private CheckpointStatsTracker statsTracker;
    private final SharedStateRegistryFactory sharedStateRegistryFactory;
    private SharedStateRegistry sharedStateRegistry;
    private boolean isPreferCheckpointForRecovery;
    private final CheckpointFailureManager failureManager;
    private final Clock clock;

    public CheckpointCoordinator(JobID job, CheckpointCoordinatorConfiguration chkConfig, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, StateBackend checkpointStateBackend, Executor executor, ScheduledExecutor timer, SharedStateRegistryFactory sharedStateRegistryFactory, CheckpointFailureManager failureManager) {
        this(job, chkConfig, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, completedCheckpointStore, checkpointStateBackend, executor, timer, sharedStateRegistryFactory, failureManager, SystemClock.getInstance());
    }

    @VisibleForTesting
    public CheckpointCoordinator(JobID job, CheckpointCoordinatorConfiguration chkConfig, ExecutionVertex[] tasksToTrigger, ExecutionVertex[] tasksToWaitFor, ExecutionVertex[] tasksToCommitTo, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, StateBackend checkpointStateBackend, Executor executor, ScheduledExecutor timer, SharedStateRegistryFactory sharedStateRegistryFactory, CheckpointFailureManager failureManager, Clock clock) {
        long baseInterval;
        Preconditions.checkNotNull((Object)checkpointStateBackend);
        long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();
        if (minPauseBetweenCheckpoints > 31536000000L) {
            minPauseBetweenCheckpoints = 31536000000L;
        }
        if ((baseInterval = chkConfig.getCheckpointInterval()) < minPauseBetweenCheckpoints) {
            baseInterval = minPauseBetweenCheckpoints;
        }
        this.job = (JobID)Preconditions.checkNotNull((Object)job);
        this.baseInterval = baseInterval;
        this.checkpointTimeout = chkConfig.getCheckpointTimeout();
        this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
        this.maxConcurrentCheckpointAttempts = chkConfig.getMaxConcurrentCheckpoints();
        this.tasksToTrigger = (ExecutionVertex[])Preconditions.checkNotNull((Object)tasksToTrigger);
        this.tasksToWaitFor = (ExecutionVertex[])Preconditions.checkNotNull((Object)tasksToWaitFor);
        this.tasksToCommitTo = (ExecutionVertex[])Preconditions.checkNotNull((Object)tasksToCommitTo);
        this.pendingCheckpoints = new LinkedHashMap<Long, PendingCheckpoint>();
        this.checkpointIdCounter = (CheckpointIDCounter)Preconditions.checkNotNull((Object)checkpointIDCounter);
        this.completedCheckpointStore = (CompletedCheckpointStore)Preconditions.checkNotNull((Object)completedCheckpointStore);
        this.executor = (Executor)Preconditions.checkNotNull((Object)executor);
        this.sharedStateRegistryFactory = (SharedStateRegistryFactory)Preconditions.checkNotNull((Object)sharedStateRegistryFactory);
        this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
        this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
        this.failureManager = (CheckpointFailureManager)Preconditions.checkNotNull((Object)failureManager);
        this.clock = (Clock)Preconditions.checkNotNull((Object)clock);
        this.recentPendingCheckpoints = new ArrayDeque(16);
        this.masterHooks = new HashMap();
        this.timer = timer;
        this.checkpointProperties = CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());
        try {
            this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
            this.checkpointStorage.initializeBaseLocations();
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Failed to create checkpoint storage at checkpoint coordinator side.", (Throwable)e);
        }
        try {
            checkpointIDCounter.start();
        }
        catch (Throwable t) {
            throw new RuntimeException("Failed to start checkpoint ID counter: " + t.getMessage(), t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean addMasterHook(MasterTriggerRestoreHook<?> hook) {
        Preconditions.checkNotNull(hook);
        String id = hook.getIdentifier();
        Preconditions.checkArgument((!StringUtils.isNullOrWhitespaceOnly((String)id) ? 1 : 0) != 0, (Object)"The hook has a null or empty id");
        Object object = this.lock;
        synchronized (object) {
            if (!this.masterHooks.containsKey(id)) {
                this.masterHooks.put(id, hook);
                return true;
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfRegisteredMasterHooks() {
        Object object = this.lock;
        synchronized (object) {
            return this.masterHooks.size();
        }
    }

    public void setCheckpointStatsTracker(@Nullable CheckpointStatsTracker statsTracker) {
        this.statsTracker = statsTracker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown(JobStatus jobStatus) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (!this.shutdown) {
                this.shutdown = true;
                LOG.info("Stopping checkpoint coordinator for job {}.", (Object)this.job);
                this.periodicScheduling = false;
                this.triggerRequestQueued = false;
                MasterHooks.close(this.masterHooks.values(), LOG);
                this.masterHooks.clear();
                for (PendingCheckpoint pending : this.pendingCheckpoints.values()) {
                    this.failPendingCheckpoint(pending, CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
                }
                this.pendingCheckpoints.clear();
                this.completedCheckpointStore.shutdown(jobStatus);
                this.checkpointIdCounter.shutdown(jobStatus);
            }
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(long timestamp, @Nullable String targetLocation) {
        CheckpointProperties properties = CheckpointProperties.forSavepoint();
        return this.triggerSavepointInternal(timestamp, properties, false, targetLocation);
    }

    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(long timestamp, boolean advanceToEndOfEventTime, @Nullable String targetLocation) {
        CheckpointProperties properties = CheckpointProperties.forSyncSavepoint();
        return this.triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation).handle((completedCheckpoint, throwable) -> {
            if (throwable != null) {
                this.failureManager.handleSynchronousSavepointFailure((Throwable)throwable);
                throw new CompletionException((Throwable)throwable);
            }
            return completedCheckpoint;
        });
    }

    private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(long timestamp, CheckpointProperties checkpointProperties, boolean advanceToEndOfEventTime, @Nullable String targetLocation) {
        Preconditions.checkNotNull((Object)checkpointProperties);
        CompletableFuture<CompletedCheckpoint> resultFuture = new CompletableFuture<CompletedCheckpoint>();
        this.timer.execute(() -> {
            try {
                this.triggerCheckpoint(timestamp, checkpointProperties, targetLocation, false, advanceToEndOfEventTime).whenComplete((completedCheckpoint, throwable) -> {
                    if (throwable == null) {
                        resultFuture.complete((CompletedCheckpoint)completedCheckpoint);
                    } else {
                        resultFuture.completeExceptionally((Throwable)throwable);
                    }
                });
            }
            catch (CheckpointException e) {
                CheckpointException cause = new CheckpointException("Failed to trigger savepoint.", e.getCheckpointFailureReason());
                resultFuture.completeExceptionally(cause);
            }
        });
        return resultFuture;
    }

    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long timestamp, boolean isPeriodic) {
        try {
            return this.triggerCheckpoint(timestamp, this.checkpointProperties, null, isPeriodic, false);
        }
        catch (CheckpointException e) {
            long latestGeneratedCheckpointId = this.getCheckpointIdCounter().get();
            this.failureManager.handleJobLevelCheckpointException(e, -1L * latestGeneratedCheckpointId);
            return FutureUtils.completedExceptionally(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic, boolean advanceToEndOfTime) throws CheckpointException {
        CheckpointStorageLocation checkpointStorageLocation;
        long checkpointID;
        if (!(!advanceToEndOfTime || props.isSynchronous() && props.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }
        Object object = this.lock;
        synchronized (object) {
            this.preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
        }
        Execution[] executions = new Execution[this.tasksToTrigger.length];
        for (int i = 0; i < this.tasksToTrigger.length; ++i) {
            ExecutionVertex[] ee = this.tasksToTrigger[i].getCurrentExecutionAttempt();
            if (ee == null) {
                LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.", (Object)this.tasksToTrigger[i].getTaskNameWithSubtaskIndex(), (Object)this.job);
                throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }
            if (ee.getState() != ExecutionState.RUNNING) {
                LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", new Object[]{this.tasksToTrigger[i].getTaskNameWithSubtaskIndex(), this.job, ExecutionState.RUNNING, ee.getState()});
                throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }
            executions[i] = ee;
        }
        HashMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<ExecutionAttemptID, ExecutionVertex>(this.tasksToWaitFor.length);
        for (ExecutionVertex ev : this.tasksToWaitFor) {
            Execution ee = ev.getCurrentExecutionAttempt();
            if (ee == null) {
                LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.", (Object)ev.getTaskNameWithSubtaskIndex(), (Object)this.job);
                throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }
            ackTasks.put(ee.getAttemptId(), ev);
        }
        try {
            checkpointID = this.checkpointIdCounter.getAndIncrement();
            checkpointStorageLocation = props.isSavepoint() ? this.checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) : this.checkpointStorage.initializeLocationForCheckpoint(checkpointID);
        }
        catch (Throwable t) {
            int numUnsuccessful = this.numUnsuccessfulCheckpointsTriggers.incrementAndGet();
            LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).", new Object[]{this.job, numUnsuccessful, t});
            throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
        }
        PendingCheckpoint checkpoint = new PendingCheckpoint(this.job, checkpointID, timestamp, ackTasks, this.masterHooks.keySet(), props, checkpointStorageLocation, this.executor);
        if (this.statsTracker != null) {
            PendingCheckpointStats callback = this.statsTracker.reportPendingCheckpoint(checkpointID, timestamp, props);
            checkpoint.setStatsCallback(callback);
        }
        Runnable canceller = () -> {
            Object object = this.lock;
            synchronized (object) {
                if (!checkpoint.isDiscarded()) {
                    LOG.info("Checkpoint {} of job {} expired before completing.", (Object)checkpointID, (Object)this.job);
                    this.failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);
                    this.pendingCheckpoints.remove(checkpointID);
                    this.rememberRecentCheckpointId(checkpointID);
                    this.triggerQueuedRequests();
                }
            }
        };
        try {
            Object object2 = this.lock;
            synchronized (object2) {
                this.preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
                LOG.info("Triggering checkpoint {} @ {} for job {}.", new Object[]{checkpointID, timestamp, this.job});
                this.pendingCheckpoints.put(checkpointID, checkpoint);
                Execution[] cancellerHandle = this.timer.schedule(canceller, this.checkpointTimeout, TimeUnit.MILLISECONDS);
                if (!checkpoint.setCancellerHandle((ScheduledFuture<?>)cancellerHandle)) {
                    cancellerHandle.cancel(false);
                }
                for (MasterTriggerRestoreHook<?> masterHook : this.masterHooks.values()) {
                    MasterState masterState = MasterHooks.triggerHook(masterHook, checkpointID, timestamp, this.executor).get(this.checkpointTimeout, TimeUnit.MILLISECONDS);
                    checkpoint.acknowledgeMasterState(masterHook.getIdentifier(), masterState);
                }
                Preconditions.checkState((boolean)checkpoint.areMasterStatesFullyAcknowledged());
            }
            CheckpointOptions checkpointOptions = new CheckpointOptions(props.getCheckpointType(), checkpointStorageLocation.getLocationReference());
            for (Execution execution : executions) {
                if (props.isSynchronous()) {
                    execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                    continue;
                }
                execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
            }
            this.numUnsuccessfulCheckpointsTriggers.set(0);
            return checkpoint.getCompletionFuture();
        }
        catch (Throwable t) {
            Object cancellerHandle = this.lock;
            synchronized (cancellerHandle) {
                this.pendingCheckpoints.remove(checkpointID);
            }
            int numUnsuccessful = this.numUnsuccessfulCheckpointsTriggers.incrementAndGet();
            LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)", new Object[]{checkpointID, this.job, numUnsuccessful, t});
            if (!checkpoint.isDiscarded()) {
                this.failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);
            }
            try {
                checkpointStorageLocation.disposeOnFailure();
            }
            catch (Throwable t2) {
                LOG.warn("Cannot dispose failed checkpoint storage location {}", (Object)checkpointStorageLocation, (Object)t2);
            }
            if (t instanceof CheckpointException) {
                throw (CheckpointException)t;
            }
            throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveDeclineMessage(DeclineCheckpoint message, String taskManagerLocationInfo) {
        if (this.shutdown || message == null) {
            return;
        }
        if (!this.job.equals((Object)message.getJob())) {
            throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + message.getJob() + " from " + taskManagerLocationInfo + " while this coordinator handles job " + this.job);
        }
        long checkpointId = message.getCheckpointId();
        String reason = message.getReason() != null ? message.getReason().getMessage() : "";
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                return;
            }
            PendingCheckpoint checkpoint = this.pendingCheckpoints.remove(checkpointId);
            if (checkpoint != null && !checkpoint.isDiscarded()) {
                LOG.info("Decline checkpoint {} by task {} of job {} at {}.", new Object[]{checkpointId, message.getTaskExecutionId(), this.job, taskManagerLocationInfo});
                this.discardCheckpoint(checkpoint, message.getReason(), message.getTaskExecutionId());
            } else {
                if (checkpoint != null) {
                    throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
                }
                if (LOG.isDebugEnabled()) {
                    if (this.recentPendingCheckpoints.contains(checkpointId)) {
                        LOG.debug("Received another decline message for now expired checkpoint attempt {} from task {} of job {} at {} : {}", new Object[]{checkpointId, message.getTaskExecutionId(), this.job, taskManagerLocationInfo, reason});
                    } else {
                        LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} from task {} of job {} at {} : {}", new Object[]{checkpointId, message.getTaskExecutionId(), this.job, taskManagerLocationInfo, reason});
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo) throws CheckpointException {
        if (this.shutdown || message == null) {
            return false;
        }
        if (!this.job.equals((Object)message.getJob())) {
            LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", new Object[]{this.job, taskManagerLocationInfo, message});
            return false;
        }
        long checkpointId = message.getCheckpointId();
        Object object = this.lock;
        synchronized (object) {
            boolean wasPendingCheckpoint;
            if (this.shutdown) {
                return false;
            }
            PendingCheckpoint checkpoint = this.pendingCheckpoints.get(checkpointId);
            if (checkpoint != null && !checkpoint.isDiscarded()) {
                switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
                    case SUCCESS: {
                        LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.", new Object[]{checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo});
                        if (!checkpoint.areTasksFullyAcknowledged()) break;
                        this.completePendingCheckpoint(checkpoint);
                        break;
                    }
                    case DUPLICATE: {
                        LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.", new Object[]{message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo});
                        break;
                    }
                    case UNKNOWN: {
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, because the task's execution attempt id was unknown. Discarding the state handle to avoid lingering state.", new Object[]{message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo});
                        this.discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
                        break;
                    }
                    case DISCARDED: {
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, because the pending checkpoint had been discarded. Discarding the state handle tp avoid lingering state.", new Object[]{message.getCheckpointId(), message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo});
                        this.discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
                    }
                }
                return true;
            }
            if (checkpoint != null) {
                throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
            }
            if (this.recentPendingCheckpoints.contains(checkpointId)) {
                wasPendingCheckpoint = true;
                LOG.warn("Received late message for now expired checkpoint attempt {} from task {} of job {} at {}.", new Object[]{checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo});
            } else {
                LOG.debug("Received message for an unknown checkpoint {} from task {} of job {} at {}.", new Object[]{checkpointId, message.getTaskExecutionId(), message.getJob(), taskManagerLocationInfo});
                wasPendingCheckpoint = false;
            }
            this.discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
            return wasPendingCheckpoint;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
        CompletedCheckpoint completedCheckpoint;
        long checkpointId = pendingCheckpoint.getCheckpointId();
        Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
        this.sharedStateRegistry.registerAll(operatorStates.values());
        try {
            try {
                completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();
                this.failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointId());
            }
            catch (Exception e1) {
                if (!pendingCheckpoint.isDiscarded()) {
                    this.failPendingCheckpoint(pendingCheckpoint, CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
                }
                throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e1);
            }
            Preconditions.checkState((pendingCheckpoint.isDiscarded() && completedCheckpoint != null ? 1 : 0) != 0);
            try {
                this.completedCheckpointStore.addCheckpoint(completedCheckpoint);
            }
            catch (Exception exception) {
                this.executor.execute(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            completedCheckpoint.discardOnFailedStoring();
                        }
                        catch (Throwable t) {
                            LOG.warn("Could not properly discard completed checkpoint {}.", (Object)completedCheckpoint.getCheckpointID(), (Object)t);
                        }
                    }
                });
                throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, exception);
            }
        }
        finally {
            this.pendingCheckpoints.remove(checkpointId);
            this.triggerQueuedRequests();
        }
        this.rememberRecentCheckpointId(checkpointId);
        this.dropSubsumedCheckpoints(checkpointId);
        this.lastCheckpointCompletionRelativeTime = this.clock.relativeTimeMillis();
        LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", new Object[]{checkpointId, this.job, completedCheckpoint.getStateSize(), completedCheckpoint.getDuration()});
        if (LOG.isDebugEnabled()) {
            StringBuilder builder = new StringBuilder();
            builder.append("Checkpoint state: ");
            for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {
                builder.append(state);
                builder.append(", ");
            }
            builder.setLength(builder.length() - 2);
            LOG.debug(builder.toString());
        }
        long timestamp = completedCheckpoint.getTimestamp();
        for (ExecutionVertex ev : this.tasksToCommitTo) {
            Execution ee = ev.getCurrentExecutionAttempt();
            if (ee == null) continue;
            ee.notifyCheckpointComplete(checkpointId, timestamp);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void failUnacknowledgedPendingCheckpointsFor(ExecutionAttemptID executionAttemptId, Throwable cause) {
        Object object = this.lock;
        synchronized (object) {
            Iterator<PendingCheckpoint> pendingCheckpointIterator = this.pendingCheckpoints.values().iterator();
            while (pendingCheckpointIterator.hasNext()) {
                PendingCheckpoint pendingCheckpoint = pendingCheckpointIterator.next();
                if (pendingCheckpoint.isAcknowledgedBy(executionAttemptId)) continue;
                pendingCheckpointIterator.remove();
                this.discardCheckpoint(pendingCheckpoint, cause, executionAttemptId);
            }
        }
    }

    private void rememberRecentCheckpointId(long id) {
        if (this.recentPendingCheckpoints.size() >= 16) {
            this.recentPendingCheckpoints.removeFirst();
        }
        this.recentPendingCheckpoints.addLast(id);
    }

    private void dropSubsumedCheckpoints(long checkpointId) {
        Iterator<Map.Entry<Long, PendingCheckpoint>> entries = this.pendingCheckpoints.entrySet().iterator();
        while (entries.hasNext()) {
            PendingCheckpoint p = entries.next().getValue();
            if (p.getCheckpointId() >= checkpointId || !p.canBeSubsumed()) continue;
            this.rememberRecentCheckpointId(p.getCheckpointId());
            this.failPendingCheckpoint(p, CheckpointFailureReason.CHECKPOINT_SUBSUMED);
            entries.remove();
        }
    }

    private void triggerQueuedRequests() {
        if (this.triggerRequestQueued) {
            this.triggerRequestQueued = false;
            if (this.periodicScheduling) {
                if (this.currentPeriodicTrigger != null) {
                    this.currentPeriodicTrigger.cancel(false);
                }
                this.currentPeriodicTrigger = this.scheduleTriggerWithDelay(0L);
            } else {
                this.timer.execute(new ScheduledTrigger());
            }
        }
    }

    @Deprecated
    public boolean restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> tasks, boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception {
        return this.restoreLatestCheckpointedState(new HashSet<ExecutionJobVertex>(tasks.values()), errorIfNoCheckpoint, allowNonRestoredState);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean restoreLatestCheckpointedState(Set<ExecutionJobVertex> tasks, boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            this.sharedStateRegistry.close();
            this.sharedStateRegistry = this.sharedStateRegistryFactory.create(this.executor);
            this.completedCheckpointStore.recover();
            for (CompletedCheckpoint completedCheckpoint : this.completedCheckpointStore.getAllCheckpoints()) {
                completedCheckpoint.registerSharedStatesAfterRestored(this.sharedStateRegistry);
            }
            LOG.debug("Status of the shared state registry of job {} after restore: {}.", (Object)this.job, (Object)this.sharedStateRegistry);
            CompletedCheckpoint latest = this.completedCheckpointStore.getLatestCheckpoint(this.isPreferCheckpointForRecovery);
            if (latest == null) {
                if (errorIfNoCheckpoint) {
                    throw new IllegalStateException("No completed checkpoint available");
                }
                LOG.debug("Resetting the master hooks.");
                MasterHooks.reset(this.masterHooks.values(), LOG);
                return false;
            }
            LOG.info("Restoring job {} from latest valid checkpoint: {}.", (Object)this.job, (Object)latest);
            Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
            StateAssignmentOperation stateAssignmentOperation = new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);
            stateAssignmentOperation.assignStates();
            MasterHooks.restoreMasterHooks(this.masterHooks, latest.getMasterHookStates(), latest.getCheckpointID(), allowNonRestoredState, LOG);
            if (this.statsTracker != null) {
                long restoreTimestamp = System.currentTimeMillis();
                RestoredCheckpointStats restored = new RestoredCheckpointStats(latest.getCheckpointID(), latest.getProperties(), restoreTimestamp, latest.getExternalPointer());
                this.statsTracker.reportRestoredCheckpoint(restored);
            }
            return true;
        }
    }

    public boolean restoreSavepoint(String savepointPointer, boolean allowNonRestored, Map<JobVertexID, ExecutionJobVertex> tasks, ClassLoader userClassLoader) throws Exception {
        Preconditions.checkNotNull((Object)savepointPointer, (String)"The savepoint path cannot be null.");
        LOG.info("Starting job {} from savepoint {} ({})", new Object[]{this.job, savepointPointer, allowNonRestored ? "allowing non restored state" : ""});
        CompletedCheckpointStorageLocation checkpointLocation = this.checkpointStorage.resolveCheckpoint(savepointPointer);
        CompletedCheckpoint savepoint = Checkpoints.loadAndValidateCheckpoint(this.job, tasks, checkpointLocation, userClassLoader, allowNonRestored);
        this.completedCheckpointStore.addCheckpoint(savepoint);
        long nextCheckpointId = savepoint.getCheckpointID() + 1L;
        this.checkpointIdCounter.setCount(nextCheckpointId);
        LOG.info("Reset the checkpoint ID of job {} to {}.", (Object)this.job, (Object)nextCheckpointId);
        return this.restoreLatestCheckpointedState(new HashSet<ExecutionJobVertex>(tasks.values()), true, allowNonRestored);
    }

    public int getNumberOfPendingCheckpoints() {
        return this.pendingCheckpoints.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfRetainedSuccessfulCheckpoints() {
        Object object = this.lock;
        synchronized (object) {
            return this.completedCheckpointStore.getNumberOfRetainedCheckpoints();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
        Object object = this.lock;
        synchronized (object) {
            return new HashMap<Long, PendingCheckpoint>(this.pendingCheckpoints);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            return this.completedCheckpointStore.getAllCheckpoints();
        }
    }

    public CheckpointStorageCoordinatorView getCheckpointStorage() {
        return this.checkpointStorage;
    }

    public CompletedCheckpointStore getCheckpointStore() {
        return this.completedCheckpointStore;
    }

    public CheckpointIDCounter getCheckpointIdCounter() {
        return this.checkpointIdCounter;
    }

    public long getCheckpointTimeout() {
        return this.checkpointTimeout;
    }

    @VisibleForTesting
    boolean isCurrentPeriodicTriggerAvailable() {
        return this.currentPeriodicTrigger != null;
    }

    public boolean isPeriodicCheckpointingConfigured() {
        return this.baseInterval != Long.MAX_VALUE;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startCheckpointScheduler() {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            this.stopCheckpointScheduler();
            this.periodicScheduling = true;
            this.currentPeriodicTrigger = this.scheduleTriggerWithDelay(this.getRandomInitDelay());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopCheckpointScheduler() {
        Object object = this.lock;
        synchronized (object) {
            this.triggerRequestQueued = false;
            this.periodicScheduling = false;
            if (this.currentPeriodicTrigger != null) {
                this.currentPeriodicTrigger.cancel(false);
                this.currentPeriodicTrigger = null;
            }
            this.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND));
            this.numUnsuccessfulCheckpointsTriggers.set(0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortPendingCheckpoints(CheckpointException exception) {
        Object object = this.lock;
        synchronized (object) {
            for (PendingCheckpoint p : this.pendingCheckpoints.values()) {
                this.failPendingCheckpoint(p, exception.getCheckpointFailureReason());
            }
            this.pendingCheckpoints.clear();
        }
    }

    private void checkConcurrentCheckpoints() throws CheckpointException {
        if (this.pendingCheckpoints.size() >= this.maxConcurrentCheckpointAttempts) {
            this.triggerRequestQueued = true;
            if (this.currentPeriodicTrigger != null) {
                this.currentPeriodicTrigger.cancel(false);
                this.currentPeriodicTrigger = null;
            }
            throw new CheckpointException(CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
        }
    }

    private void checkMinPauseBetweenCheckpoints() throws CheckpointException {
        long nextCheckpointTriggerRelativeTime = this.lastCheckpointCompletionRelativeTime + this.minPauseBetweenCheckpoints;
        long durationTillNextMillis = nextCheckpointTriggerRelativeTime - this.clock.relativeTimeMillis();
        if (durationTillNextMillis > 0L) {
            if (this.currentPeriodicTrigger != null) {
                this.currentPeriodicTrigger.cancel(false);
                this.currentPeriodicTrigger = null;
            }
            this.currentPeriodicTrigger = this.scheduleTriggerWithDelay(durationTillNextMillis);
            throw new CheckpointException(CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
        }
    }

    private long getRandomInitDelay() {
        return ThreadLocalRandom.current().nextLong(this.minPauseBetweenCheckpoints, this.baseInterval + 1L);
    }

    private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {
        return this.timer.scheduleAtFixedRate(new ScheduledTrigger(), initDelay, this.baseInterval, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobStatusListener createActivatorDeactivator() {
        Object object = this.lock;
        synchronized (object) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (this.jobStatusListener == null) {
                this.jobStatusListener = new CheckpointCoordinatorDeActivator(this);
            }
            return this.jobStatusListener;
        }
    }

    private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Throwable cause, ExecutionAttemptID executionAttemptID) {
        assert (Thread.holdsLock(this.lock));
        Preconditions.checkNotNull((Object)pendingCheckpoint);
        long checkpointId = pendingCheckpoint.getCheckpointId();
        LOG.info("Discarding checkpoint {} of job {}.", new Object[]{checkpointId, this.job, cause});
        if (cause == null) {
            this.failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.CHECKPOINT_DECLINED, executionAttemptID);
        } else if (cause instanceof CheckpointException) {
            CheckpointException exception = (CheckpointException)cause;
            this.failPendingCheckpointDueToTaskFailure(pendingCheckpoint, exception.getCheckpointFailureReason(), cause, executionAttemptID);
        } else {
            this.failPendingCheckpointDueToTaskFailure(pendingCheckpoint, CheckpointFailureReason.JOB_FAILURE, cause, executionAttemptID);
        }
        this.rememberRecentCheckpointId(checkpointId);
        boolean haveMoreRecentPending = false;
        for (PendingCheckpoint p : this.pendingCheckpoints.values()) {
            if (p.isDiscarded() || p.getCheckpointId() < pendingCheckpoint.getCheckpointId()) continue;
            haveMoreRecentPending = true;
            break;
        }
        if (!haveMoreRecentPending) {
            this.triggerQueuedRequests();
        }
    }

    private void discardSubtaskState(final JobID jobId, final ExecutionAttemptID executionAttemptID, final long checkpointId, final TaskStateSnapshot subtaskState) {
        if (subtaskState != null) {
            this.executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        subtaskState.discardState();
                    }
                    catch (Throwable t2) {
                        LOG.warn("Could not properly discard state object of checkpoint {} belonging to task {} of job {}.", new Object[]{checkpointId, executionAttemptID, jobId, t2});
                    }
                }
            });
        }
    }

    private void failPendingCheckpoint(PendingCheckpoint pendingCheckpoint, CheckpointFailureReason reason) {
        this.failPendingCheckpoint(pendingCheckpoint, reason, null);
    }

    private void failPendingCheckpoint(PendingCheckpoint pendingCheckpoint, CheckpointFailureReason reason, @Nullable Throwable cause) {
        CheckpointException exception = new CheckpointException(reason, cause);
        pendingCheckpoint.abort(reason, cause);
        this.failureManager.handleJobLevelCheckpointException(exception, pendingCheckpoint.getCheckpointId());
        this.checkAndResetCheckpointScheduler();
    }

    private void failPendingCheckpointDueToTaskFailure(PendingCheckpoint pendingCheckpoint, CheckpointFailureReason reason, ExecutionAttemptID executionAttemptID) {
        this.failPendingCheckpointDueToTaskFailure(pendingCheckpoint, reason, null, executionAttemptID);
    }

    private void failPendingCheckpointDueToTaskFailure(PendingCheckpoint pendingCheckpoint, CheckpointFailureReason reason, @Nullable Throwable cause, ExecutionAttemptID executionAttemptID) {
        CheckpointException exception = new CheckpointException(reason, cause);
        pendingCheckpoint.abort(reason, cause);
        this.failureManager.handleTaskLevelCheckpointException(exception, pendingCheckpoint.getCheckpointId(), executionAttemptID);
        this.checkAndResetCheckpointScheduler();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkAndResetCheckpointScheduler() {
        if (!this.shutdown && this.periodicScheduling && this.currentPeriodicTrigger == null) {
            Object object = this.lock;
            synchronized (object) {
                if (this.pendingCheckpoints.isEmpty() || this.allPendingCheckpointsDiscarded()) {
                    this.triggerRequestQueued = false;
                    this.currentPeriodicTrigger = this.scheduleTriggerWithDelay(this.getRandomInitDelay());
                }
            }
        }
    }

    private boolean allPendingCheckpointsDiscarded() {
        return this.pendingCheckpoints.values().stream().allMatch(PendingCheckpoint::isDiscarded);
    }

    private void preCheckBeforeTriggeringCheckpoint(boolean isPeriodic, boolean forceCheckpoint) throws CheckpointException {
        if (this.shutdown) {
            throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
        }
        if (isPeriodic && !this.periodicScheduling) {
            throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
        }
        if (!forceCheckpoint) {
            if (this.triggerRequestQueued) {
                LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", (Object)this.job);
                throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);
            }
            this.checkConcurrentCheckpoints();
            this.checkMinPauseBetweenCheckpoints();
        }
    }

    private final class ScheduledTrigger
    implements Runnable {
        private ScheduledTrigger() {
        }

        @Override
        public void run() {
            try {
                CheckpointCoordinator.this.triggerCheckpoint(System.currentTimeMillis(), true);
            }
            catch (Exception e) {
                LOG.error("Exception while triggering checkpoint for job {}.", (Object)CheckpointCoordinator.this.job, (Object)e);
            }
        }
    }
}

