package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.LongConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.FinishedTaskStateProvider;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator.class */
public class CheckpointCoordinator {
    private static final Logger LOG;
    private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
    private final Object lock;
    private final JobID job;
    private final CheckpointProperties checkpointProperties;
    private final Executor executor;
    private final CheckpointsCleaner checkpointsCleaner;
    private final Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint;

    @GuardedBy("lock")
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final CheckpointStorageCoordinatorView checkpointStorageView;
    private final ArrayDeque<Long> recentPendingCheckpoints;
    private final CheckpointIDCounter checkpointIdCounter;
    private final long baseInterval;
    private final long checkpointTimeout;
    private final long minPauseBetweenCheckpoints;
    private final ScheduledExecutor timer;
    private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;
    private final boolean unalignedCheckpointsEnabled;
    private final long alignedCheckpointTimeout;
    private JobStatusListener jobStatusListener;
    private ScheduledFuture<?> currentPeriodicTrigger;
    private long lastCheckpointCompletionRelativeTime;
    private boolean periodicScheduling;
    private volatile boolean shutdown;
    private final CheckpointStatsTracker statsTracker;
    private final BiFunction<Set<ExecutionJobVertex>, Map<OperatorID, OperatorState>, VertexFinishedStateChecker> vertexFinishedStateCheckerFactory;
    private final long checkpointIdOfIgnoredInFlightData;
    private final CheckpointFailureManager failureManager;
    private final Clock clock;
    private final boolean isExactlyOnceMode;
    private boolean isTriggering;
    private final CheckpointRequestDecider requestDecider;
    private final CheckpointPlanCalculator checkpointPlanCalculator;
    private final ExecutionAttemptMappingProvider attemptMappingProvider;
    private boolean baseLocationsForCheckpointInitialized;
    private boolean forceFullSnapshot;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.checkpoint.CheckpointCoordinator$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$core$execution$CheckpointType;

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$jobgraph$RestoreMode[RestoreMode.CLAIM.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$jobgraph$RestoreMode[RestoreMode.LEGACY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$jobgraph$RestoreMode[RestoreMode.NO_CLAIM.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$org$apache$flink$runtime$checkpoint$PendingCheckpoint$TaskAcknowledgeResult = new int[PendingCheckpoint.TaskAcknowledgeResult.values().length];
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$PendingCheckpoint$TaskAcknowledgeResult[PendingCheckpoint.TaskAcknowledgeResult.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$PendingCheckpoint$TaskAcknowledgeResult[PendingCheckpoint.TaskAcknowledgeResult.DUPLICATE.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$PendingCheckpoint$TaskAcknowledgeResult[PendingCheckpoint.TaskAcknowledgeResult.UNKNOWN.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$checkpoint$PendingCheckpoint$TaskAcknowledgeResult[PendingCheckpoint.TaskAcknowledgeResult.DISCARDED.ordinal()] = 4;
            } catch (NoSuchFieldError e7) {
            }
            $SwitchMap$org$apache$flink$core$execution$CheckpointType = new int[org.apache.flink.core.execution.CheckpointType.values().length];
            try {
                $SwitchMap$org$apache$flink$core$execution$CheckpointType[org.apache.flink.core.execution.CheckpointType.CONFIGURED.ordinal()] = 1;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$flink$core$execution$CheckpointType[org.apache.flink.core.execution.CheckpointType.FULL.ordinal()] = 2;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$flink$core$execution$CheckpointType[org.apache.flink.core.execution.CheckpointType.INCREMENTAL.ordinal()] = 3;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator$CheckpointCanceller.class */
    public class CheckpointCanceller implements Runnable {
        private final PendingCheckpoint pendingCheckpoint;

        private CheckpointCanceller(PendingCheckpoint pendingCheckpoint) {
            this.pendingCheckpoint = (PendingCheckpoint) Preconditions.checkNotNull(pendingCheckpoint);
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (CheckpointCoordinator.this.lock) {
                if (!this.pendingCheckpoint.isDisposed()) {
                    CheckpointCoordinator.LOG.info("Checkpoint {} of job {} expired before completing.", Long.valueOf(this.pendingCheckpoint.getCheckpointID()), CheckpointCoordinator.this.job);
                    CheckpointCoordinator.this.abortPendingCheckpoint(this.pendingCheckpoint, new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator$CheckpointTriggerRequest.class */
    public static class CheckpointTriggerRequest {
        final CheckpointProperties props;

        @Nullable
        final String externalSavepointLocation;
        final boolean isPeriodic;
        private final CompletableFuture<CompletedCheckpoint> onCompletionPromise = new CompletableFuture<>();
        final long timestamp = System.currentTimeMillis();

        CheckpointTriggerRequest(CheckpointProperties checkpointProperties, @Nullable String str, boolean z) {
            this.props = (CheckpointProperties) Preconditions.checkNotNull(checkpointProperties);
            this.externalSavepointLocation = str;
            this.isPeriodic = z;
        }

        CompletableFuture<CompletedCheckpoint> getOnCompletionFuture() {
            return this.onCompletionPromise;
        }

        public void completeExceptionally(CheckpointException checkpointException) {
            this.onCompletionPromise.completeExceptionally(checkpointException);
        }

        public boolean isForce() {
            return this.props.forceCheckpoint();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator$OperatorCoordinatorRestoreBehavior.class */
    public enum OperatorCoordinatorRestoreBehavior {
        RESTORE_OR_RESET,
        RESTORE_IF_CHECKPOINT_PRESENT,
        SKIP
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator$ScheduledTrigger.class */
    public final class ScheduledTrigger implements Runnable {
        private ScheduledTrigger() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                CheckpointCoordinator.this.triggerCheckpoint(CheckpointCoordinator.this.checkpointProperties, null, true);
            } catch (Exception e) {
                CheckpointCoordinator.LOG.error("Exception while triggering checkpoint for job {}.", CheckpointCoordinator.this.job, e);
            }
        }
    }

    public CheckpointCoordinator(JobID jobID, CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration, Collection<OperatorCoordinatorCheckpointContext> collection, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, CheckpointStorage checkpointStorage, Executor executor, CheckpointsCleaner checkpointsCleaner, ScheduledExecutor scheduledExecutor, CheckpointFailureManager checkpointFailureManager, CheckpointPlanCalculator checkpointPlanCalculator, ExecutionAttemptMappingProvider executionAttemptMappingProvider, CheckpointStatsTracker checkpointStatsTracker) {
        this(jobID, checkpointCoordinatorConfiguration, collection, checkpointIDCounter, completedCheckpointStore, checkpointStorage, executor, checkpointsCleaner, scheduledExecutor, checkpointFailureManager, checkpointPlanCalculator, executionAttemptMappingProvider, SystemClock.getInstance(), checkpointStatsTracker, VertexFinishedStateChecker::new);
    }

    @VisibleForTesting
    public CheckpointCoordinator(JobID jobID, CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration, Collection<OperatorCoordinatorCheckpointContext> collection, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, CheckpointStorage checkpointStorage, Executor executor, CheckpointsCleaner checkpointsCleaner, ScheduledExecutor scheduledExecutor, CheckpointFailureManager checkpointFailureManager, CheckpointPlanCalculator checkpointPlanCalculator, ExecutionAttemptMappingProvider executionAttemptMappingProvider, Clock clock, CheckpointStatsTracker checkpointStatsTracker, BiFunction<Set<ExecutionJobVertex>, Map<OperatorID, OperatorState>, VertexFinishedStateChecker> biFunction) {
        this.lock = new Object();
        this.isTriggering = false;
        this.baseLocationsForCheckpointInitialized = false;
        Preconditions.checkNotNull(checkpointStorage);
        long minPauseBetweenCheckpoints = checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints();
        minPauseBetweenCheckpoints = minPauseBetweenCheckpoints > 31536000000L ? 31536000000L : minPauseBetweenCheckpoints;
        long checkpointInterval = checkpointCoordinatorConfiguration.getCheckpointInterval();
        checkpointInterval = checkpointInterval < minPauseBetweenCheckpoints ? minPauseBetweenCheckpoints : checkpointInterval;
        this.job = (JobID) Preconditions.checkNotNull(jobID);
        this.baseInterval = checkpointInterval;
        this.checkpointTimeout = checkpointCoordinatorConfiguration.getCheckpointTimeout();
        this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
        this.coordinatorsToCheckpoint = Collections.unmodifiableCollection(collection);
        this.pendingCheckpoints = new LinkedHashMap();
        this.checkpointIdCounter = (CheckpointIDCounter) Preconditions.checkNotNull(checkpointIDCounter);
        this.completedCheckpointStore = (CompletedCheckpointStore) Preconditions.checkNotNull(completedCheckpointStore);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.checkpointsCleaner = (CheckpointsCleaner) Preconditions.checkNotNull(checkpointsCleaner);
        this.failureManager = (CheckpointFailureManager) Preconditions.checkNotNull(checkpointFailureManager);
        this.checkpointPlanCalculator = (CheckpointPlanCalculator) Preconditions.checkNotNull(checkpointPlanCalculator);
        this.attemptMappingProvider = (ExecutionAttemptMappingProvider) Preconditions.checkNotNull(executionAttemptMappingProvider);
        this.clock = (Clock) Preconditions.checkNotNull(clock);
        this.isExactlyOnceMode = checkpointCoordinatorConfiguration.isExactlyOnce();
        this.unalignedCheckpointsEnabled = checkpointCoordinatorConfiguration.isUnalignedCheckpointsEnabled();
        this.alignedCheckpointTimeout = checkpointCoordinatorConfiguration.getAlignedCheckpointTimeout();
        this.checkpointIdOfIgnoredInFlightData = checkpointCoordinatorConfiguration.getCheckpointIdOfIgnoredInFlightData();
        this.recentPendingCheckpoints = new ArrayDeque<>(16);
        this.masterHooks = new HashMap<>();
        this.timer = scheduledExecutor;
        this.checkpointProperties = CheckpointProperties.forCheckpoint(checkpointCoordinatorConfiguration.getCheckpointRetentionPolicy());
        try {
            this.checkpointStorageView = checkpointStorage.createCheckpointStorage(jobID);
            if (isPeriodicCheckpointingConfigured()) {
                this.checkpointStorageView.initializeBaseLocationsForCheckpoint();
                this.baseLocationsForCheckpointInitialized = true;
            }
            try {
                checkpointIDCounter.start();
                int maxConcurrentCheckpoints = checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints();
                LongConsumer longConsumer = this::rescheduleTrigger;
                Clock clock2 = this.clock;
                long j = this.minPauseBetweenCheckpoints;
                Map<Long, PendingCheckpoint> map = this.pendingCheckpoints;
                map.getClass();
                IntSupplier intSupplier = map::size;
                CheckpointsCleaner checkpointsCleaner2 = this.checkpointsCleaner;
                checkpointsCleaner2.getClass();
                this.requestDecider = new CheckpointRequestDecider(maxConcurrentCheckpoints, longConsumer, clock2, j, intSupplier, checkpointsCleaner2::getNumberOfCheckpointsToClean);
                this.statsTracker = (CheckpointStatsTracker) Preconditions.checkNotNull(checkpointStatsTracker, "Statistic tracker can not be null");
                this.vertexFinishedStateCheckerFactory = (BiFunction) Preconditions.checkNotNull(biFunction);
            } catch (Throwable th) {
                throw new RuntimeException("Failed to start checkpoint ID counter: " + th.getMessage(), th);
            }
        } catch (IOException e) {
            throw new FlinkRuntimeException("Failed to create checkpoint storage at checkpoint coordinator side.", e);
        }
    }

    public boolean addMasterHook(MasterTriggerRestoreHook<?> masterTriggerRestoreHook) {
        Preconditions.checkNotNull(masterTriggerRestoreHook);
        String identifier = masterTriggerRestoreHook.getIdentifier();
        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(identifier), "The hook has a null or empty id");
        synchronized (this.lock) {
            if (this.masterHooks.containsKey(identifier)) {
                return false;
            }
            this.masterHooks.put(identifier, masterTriggerRestoreHook);
            return true;
        }
    }

    public int getNumberOfRegisteredMasterHooks() {
        int size;
        synchronized (this.lock) {
            size = this.masterHooks.size();
        }
        return size;
    }

    public void shutdown() throws Exception {
        synchronized (this.lock) {
            if (!this.shutdown) {
                this.shutdown = true;
                LOG.info("Stopping checkpoint coordinator for job {}.", this.job);
                this.periodicScheduling = false;
                MasterHooks.close(this.masterHooks.values(), LOG);
                this.masterHooks.clear();
                abortPendingAndQueuedCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN));
            }
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(@Nullable String str, SavepointFormatType savepointFormatType) {
        return triggerSavepointInternal(CheckpointProperties.forSavepoint(!this.unalignedCheckpointsEnabled, savepointFormatType), str);
    }

    public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(boolean z, @Nullable String str, SavepointFormatType savepointFormatType) {
        return triggerSavepointInternal(CheckpointProperties.forSyncSavepoint(!this.unalignedCheckpointsEnabled, z, savepointFormatType), str);
    }

    private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(CheckpointProperties checkpointProperties, @Nullable String str) {
        Preconditions.checkNotNull(checkpointProperties);
        return triggerCheckpointFromCheckpointThread(checkpointProperties, str, false);
    }

    private CompletableFuture<CompletedCheckpoint> triggerCheckpointFromCheckpointThread(CheckpointProperties checkpointProperties, String str, boolean z) {
        CompletableFuture<CompletedCheckpoint> completableFuture = new CompletableFuture<>();
        this.timer.execute(() -> {
            triggerCheckpoint(checkpointProperties, str, z).whenComplete((completedCheckpoint, th) -> {
                if (th == null) {
                    completableFuture.complete(completedCheckpoint);
                } else {
                    completableFuture.completeExceptionally(th);
                }
            });
        });
        return completableFuture;
    }

    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(boolean z) {
        return triggerCheckpointFromCheckpointThread(this.checkpointProperties, null, z);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v11, types: [org.apache.flink.runtime.checkpoint.SnapshotType] */
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(org.apache.flink.core.execution.CheckpointType checkpointType) {
        CheckpointType checkpointType2;
        if (checkpointType == null) {
            throw new IllegalArgumentException("checkpointType cannot be null");
        }
        switch (AnonymousClass2.$SwitchMap$org$apache$flink$core$execution$CheckpointType[checkpointType.ordinal()]) {
            case 1:
                checkpointType2 = this.checkpointProperties.getCheckpointType();
                break;
            case 2:
                checkpointType2 = CheckpointType.FULL_CHECKPOINT;
                break;
            case 3:
                checkpointType2 = CheckpointType.CHECKPOINT;
                break;
            default:
                throw new IllegalArgumentException("unknown checkpointType: " + checkpointType);
        }
        return triggerCheckpointFromCheckpointThread(new CheckpointProperties(this.checkpointProperties.forceCheckpoint(), checkpointType2, this.checkpointProperties.discardOnSubsumed(), this.checkpointProperties.discardOnJobFinished(), this.checkpointProperties.discardOnJobCancelled(), this.checkpointProperties.discardOnJobFailed(), this.checkpointProperties.discardOnJobSuspended(), this.checkpointProperties.isUnclaimed()), null, false);
    }

    @VisibleForTesting
    CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointProperties checkpointProperties, @Nullable String str, boolean z) {
        CheckpointTriggerRequest checkpointTriggerRequest = new CheckpointTriggerRequest(checkpointProperties, str, z);
        chooseRequestToExecute(checkpointTriggerRequest).ifPresent(this::startTriggeringCheckpoint);
        return checkpointTriggerRequest.onCompletionPromise;
    }

    private void startTriggeringCheckpoint(CheckpointTriggerRequest checkpointTriggerRequest) {
        try {
            synchronized (this.lock) {
                preCheckGlobalState(checkpointTriggerRequest.isPeriodic);
            }
            Preconditions.checkState(!this.isTriggering);
            this.isTriggering = true;
            long currentTimeMillis = System.currentTimeMillis();
            CompletableFuture<CheckpointPlan> calculateCheckpointPlan = this.checkpointPlanCalculator.calculateCheckpointPlan();
            boolean z = !this.baseLocationsForCheckpointInitialized;
            this.baseLocationsForCheckpointInitialized = true;
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture thenApplyAsync = calculateCheckpointPlan.thenApplyAsync(checkpointPlan -> {
                try {
                    return new Tuple2(checkpointPlan, Long.valueOf(this.checkpointIdCounter.getAndIncrement()));
                } catch (Throwable th) {
                    throw new CompletionException(th);
                }
            }, this.executor).thenApplyAsync((Function<? super U, ? extends U>) tuple2 -> {
                return createPendingCheckpoint(currentTimeMillis, checkpointTriggerRequest.props, (CheckpointPlan) tuple2.f0, checkpointTriggerRequest.isPeriodic, ((Long) tuple2.f1).longValue(), checkpointTriggerRequest.getOnCompletionFuture(), completableFuture);
            }, (Executor) this.timer);
            CompletableFuture thenComposeAsync = thenApplyAsync.thenApplyAsync(pendingCheckpoint -> {
                try {
                    return Tuple2.of(pendingCheckpoint, initializeCheckpointLocation(pendingCheckpoint.getCheckpointID(), checkpointTriggerRequest.props, checkpointTriggerRequest.externalSavepointLocation, z));
                } catch (Throwable th) {
                    throw new CompletionException(th);
                }
            }, this.executor).thenComposeAsync(tuple22 -> {
                PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) tuple22.f0;
                if (pendingCheckpoint2.isDisposed()) {
                    return null;
                }
                synchronized (this.lock) {
                    pendingCheckpoint2.setCheckpointTargetLocation((CheckpointStorageLocation) tuple22.f1);
                }
                return OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(this.coordinatorsToCheckpoint, pendingCheckpoint2, this.timer);
            }, (Executor) this.timer);
            FutureUtils.forward(CompletableFuture.allOf(thenComposeAsync.thenComposeAsync(obj -> {
                PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) FutureUtils.getWithoutException(thenApplyAsync);
                if (pendingCheckpoint2 == null || pendingCheckpoint2.isDisposed()) {
                    return null;
                }
                return snapshotMasterState(pendingCheckpoint2);
            }, (Executor) this.timer), thenComposeAsync), completableFuture);
            FutureUtils.assertNoException(completableFuture.handleAsync((r11, th) -> {
                PendingCheckpoint pendingCheckpoint2 = (PendingCheckpoint) FutureUtils.getWithoutException(thenApplyAsync);
                Preconditions.checkState((pendingCheckpoint2 == null && th == null) ? false : true, "Either the pending checkpoint needs to be created or an error must have occurred.");
                if (th == null) {
                    triggerCheckpointRequest(checkpointTriggerRequest, currentTimeMillis, pendingCheckpoint2);
                    return null;
                }
                if (pendingCheckpoint2 == null) {
                    onTriggerFailure(checkpointTriggerRequest, th);
                    return null;
                }
                onTriggerFailure(pendingCheckpoint2, th);
                return null;
            }, (Executor) this.timer).exceptionally(th2 -> {
                if (!isShutdown()) {
                    throw new CompletionException(th2);
                }
                if (ExceptionUtils.findThrowable(th2, RejectedExecutionException.class).isPresent()) {
                    LOG.debug("Execution rejected during shutdown");
                    return null;
                }
                LOG.warn("Error encountered during shutdown", th2);
                return null;
            }));
        } catch (Throwable th3) {
            onTriggerFailure(checkpointTriggerRequest, th3);
        }
    }

    private void triggerCheckpointRequest(CheckpointTriggerRequest checkpointTriggerRequest, long j, PendingCheckpoint pendingCheckpoint) {
        if (pendingCheckpoint.isDisposed()) {
            onTriggerFailure(pendingCheckpoint, new CheckpointException(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, pendingCheckpoint.getFailureCause()));
            return;
        }
        triggerTasks(checkpointTriggerRequest, j, pendingCheckpoint).exceptionally(th -> {
            LOG.info("Triggering Checkpoint {} for job {} failed due to {}", new Object[]{Long.valueOf(pendingCheckpoint.getCheckpointID()), this.job, th});
            CheckpointException checkpointException = th instanceof CheckpointException ? (CheckpointException) th : new CheckpointException(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, th);
            this.timer.execute(() -> {
                synchronized (this.lock) {
                    abortPendingCheckpoint(pendingCheckpoint, checkpointException);
                }
            });
            return null;
        });
        if (maybeCompleteCheckpoint(pendingCheckpoint)) {
            onTriggerSuccess();
        }
    }

    private CompletableFuture<Void> triggerTasks(CheckpointTriggerRequest checkpointTriggerRequest, long j, PendingCheckpoint pendingCheckpoint) {
        long checkpointID = pendingCheckpoint.getCheckpointID();
        CheckpointOptions forConfig = CheckpointOptions.forConfig((!this.forceFullSnapshot || checkpointTriggerRequest.props.isSavepoint()) ? checkpointTriggerRequest.props.getCheckpointType() : CheckpointType.FULL_CHECKPOINT, pendingCheckpoint.getCheckpointStorageLocation().getLocationReference(), this.isExactlyOnceMode, this.unalignedCheckpointsEnabled, this.alignedCheckpointTimeout);
        ArrayList arrayList = new ArrayList();
        for (Execution execution : pendingCheckpoint.getCheckpointPlan().getTasksToTrigger()) {
            if (checkpointTriggerRequest.props.isSynchronous()) {
                arrayList.add(execution.triggerSynchronousSavepoint(checkpointID, j, forConfig));
            } else {
                arrayList.add(execution.triggerCheckpoint(checkpointID, j, forConfig));
            }
        }
        return FutureUtils.waitForAll(arrayList);
    }

    private CheckpointStorageLocation initializeCheckpointLocation(long j, CheckpointProperties checkpointProperties, @Nullable String str, boolean z) throws Exception {
        CheckpointStorageLocation initializeLocationForCheckpoint;
        if (checkpointProperties.isSavepoint()) {
            initializeLocationForCheckpoint = this.checkpointStorageView.initializeLocationForSavepoint(j, str);
        } else {
            if (z) {
                this.checkpointStorageView.initializeBaseLocationsForCheckpoint();
            }
            initializeLocationForCheckpoint = this.checkpointStorageView.initializeLocationForCheckpoint(j);
        }
        return initializeLocationForCheckpoint;
    }

    private PendingCheckpoint createPendingCheckpoint(long j, CheckpointProperties checkpointProperties, CheckpointPlan checkpointPlan, boolean z, long j2, CompletableFuture<CompletedCheckpoint> completableFuture, CompletableFuture<Void> completableFuture2) {
        synchronized (this.lock) {
            try {
                preCheckGlobalState(z);
            } catch (Throwable th) {
                throw new CompletionException(th);
            }
        }
        PendingCheckpoint pendingCheckpoint = new PendingCheckpoint(this.job, j2, j, checkpointPlan, OperatorInfo.getIds(this.coordinatorsToCheckpoint), this.masterHooks.keySet(), checkpointProperties, completableFuture, trackPendingCheckpointStats(j2, checkpointPlan, checkpointProperties, j), completableFuture2);
        synchronized (this.lock) {
            this.pendingCheckpoints.put(Long.valueOf(j2), pendingCheckpoint);
            ScheduledFuture<?> schedule = this.timer.schedule(new CheckpointCanceller(pendingCheckpoint), this.checkpointTimeout, TimeUnit.MILLISECONDS);
            if (!pendingCheckpoint.setCancellerHandle(schedule)) {
                schedule.cancel(false);
            }
        }
        LOG.info("Triggering checkpoint {} (type={}) @ {} for job {}.", new Object[]{Long.valueOf(j2), pendingCheckpoint.getProps().getCheckpointType(), Long.valueOf(j), this.job});
        return pendingCheckpoint;
    }

    private CompletableFuture<Void> snapshotMasterState(PendingCheckpoint pendingCheckpoint) {
        if (this.masterHooks.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        long checkpointID = pendingCheckpoint.getCheckpointID();
        long checkpointTimestamp = pendingCheckpoint.getCheckpointTimestamp();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        for (MasterTriggerRestoreHook<?> masterTriggerRestoreHook : this.masterHooks.values()) {
            MasterHooks.triggerHook(masterTriggerRestoreHook, checkpointID, checkpointTimestamp, this.executor).whenCompleteAsync((masterState, th) -> {
                try {
                    synchronized (this.lock) {
                        if (completableFuture.isDone()) {
                            return;
                        }
                        if (pendingCheckpoint.isDisposed()) {
                            throw new IllegalStateException("Checkpoint " + checkpointID + " has been discarded");
                        }
                        if (th == null) {
                            pendingCheckpoint.acknowledgeMasterState(masterTriggerRestoreHook.getIdentifier(), masterState);
                            if (pendingCheckpoint.areMasterStatesFullyAcknowledged()) {
                                completableFuture.complete(null);
                            }
                        } else {
                            completableFuture.completeExceptionally(th);
                        }
                    }
                } catch (Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            }, (Executor) this.timer);
        }
        return completableFuture;
    }

    private void onTriggerSuccess() {
        this.isTriggering = false;
        executeQueuedRequest();
    }

    private void onTriggerFailure(CheckpointTriggerRequest checkpointTriggerRequest, Throwable th) {
        CheckpointException checkpointException = getCheckpointException(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, th);
        checkpointTriggerRequest.completeExceptionally(checkpointException);
        onTriggerFailure((PendingCheckpoint) null, checkpointTriggerRequest.props, checkpointException);
    }

    private void onTriggerFailure(PendingCheckpoint pendingCheckpoint, Throwable th) {
        Preconditions.checkArgument(pendingCheckpoint != null, "Pending checkpoint can not be null.");
        onTriggerFailure(pendingCheckpoint, pendingCheckpoint.getProps(), th);
    }

    private void onTriggerFailure(@Nullable PendingCheckpoint pendingCheckpoint, CheckpointProperties checkpointProperties, Throwable th) {
        Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
        try {
            this.coordinatorsToCheckpoint.forEach((v0) -> {
                v0.abortCurrentTriggering();
            });
            CheckpointException checkpointException = getCheckpointException(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, stripCompletionException);
            if (pendingCheckpoint == null || pendingCheckpoint.isDisposed()) {
                this.failureManager.handleCheckpointException(pendingCheckpoint, checkpointProperties, checkpointException, null, this.job, null, this.statsTracker);
            } else {
                synchronized (this.lock) {
                    abortPendingCheckpoint(pendingCheckpoint, checkpointException);
                }
            }
        } finally {
            this.isTriggering = false;
            executeQueuedRequest();
        }
    }

    private void executeQueuedRequest() {
        chooseQueuedRequestToExecute().ifPresent(this::startTriggeringCheckpoint);
    }

    private Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute() {
        Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute;
        synchronized (this.lock) {
            chooseQueuedRequestToExecute = this.requestDecider.chooseQueuedRequestToExecute(this.isTriggering, this.lastCheckpointCompletionRelativeTime);
        }
        return chooseQueuedRequestToExecute;
    }

    private Optional<CheckpointTriggerRequest> chooseRequestToExecute(CheckpointTriggerRequest checkpointTriggerRequest) {
        Optional<CheckpointTriggerRequest> chooseRequestToExecute;
        synchronized (this.lock) {
            chooseRequestToExecute = this.requestDecider.chooseRequestToExecute(checkpointTriggerRequest, this.isTriggering, this.lastCheckpointCompletionRelativeTime);
        }
        return chooseRequestToExecute;
    }

    private boolean maybeCompleteCheckpoint(PendingCheckpoint pendingCheckpoint) {
        synchronized (this.lock) {
            if (pendingCheckpoint.isFullyAcknowledged()) {
                try {
                    if (this.shutdown) {
                        return false;
                    }
                    completePendingCheckpoint(pendingCheckpoint);
                } catch (CheckpointException e) {
                    onTriggerFailure(pendingCheckpoint, e);
                    return false;
                }
            }
            return true;
        }
    }

    public void receiveDeclineMessage(DeclineCheckpoint declineCheckpoint, String str) {
        if (this.shutdown || declineCheckpoint == null) {
            return;
        }
        if (!this.job.equals(declineCheckpoint.getJob())) {
            throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + declineCheckpoint.getJob() + " from " + str + " while this coordinator handles job " + this.job);
        }
        long checkpointId = declineCheckpoint.getCheckpointId();
        CheckpointException unwrap = declineCheckpoint.getSerializedCheckpointException().unwrap();
        String message = unwrap.getMessage();
        synchronized (this.lock) {
            if (this.shutdown) {
                return;
            }
            PendingCheckpoint pendingCheckpoint = this.pendingCheckpoints.get(Long.valueOf(checkpointId));
            if (pendingCheckpoint != null) {
                Preconditions.checkState(!pendingCheckpoint.isDisposed(), "Received message for discarded but non-removed checkpoint " + checkpointId);
                LOG.info("Decline checkpoint {} by task {} of job {} at {}.", new Object[]{Long.valueOf(checkpointId), declineCheckpoint.getTaskExecutionId(), this.job, str, unwrap.getCause()});
                abortPendingCheckpoint(pendingCheckpoint, unwrap, declineCheckpoint.getTaskExecutionId());
            } else if (LOG.isDebugEnabled()) {
                if (this.recentPendingCheckpoints.contains(Long.valueOf(checkpointId))) {
                    LOG.debug("Received another decline message for now expired checkpoint attempt {} from task {} of job {} at {} : {}", new Object[]{Long.valueOf(checkpointId), declineCheckpoint.getTaskExecutionId(), this.job, str, message});
                } else {
                    LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} from task {} of job {} at {} : {}", new Object[]{Long.valueOf(checkpointId), declineCheckpoint.getTaskExecutionId(), this.job, str, message});
                }
            }
        }
    }

    public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint acknowledgeCheckpoint, String str) throws CheckpointException {
        boolean z;
        if (this.shutdown || acknowledgeCheckpoint == null) {
            return false;
        }
        if (!this.job.equals(acknowledgeCheckpoint.getJob())) {
            LOG.error("Received wrong AcknowledgeCheckpoint message for job {} from {} : {}", new Object[]{this.job, str, acknowledgeCheckpoint});
            return false;
        }
        long checkpointId = acknowledgeCheckpoint.getCheckpointId();
        synchronized (this.lock) {
            if (this.shutdown) {
                return false;
            }
            PendingCheckpoint pendingCheckpoint = this.pendingCheckpoints.get(Long.valueOf(checkpointId));
            if (acknowledgeCheckpoint.getSubtaskState() != null && (pendingCheckpoint == null || !pendingCheckpoint.getProps().isSavepoint())) {
                acknowledgeCheckpoint.getSubtaskState().registerSharedStates(this.completedCheckpointStore.getSharedStateRegistry(), checkpointId);
            }
            if (pendingCheckpoint == null || pendingCheckpoint.isDisposed()) {
                if (pendingCheckpoint != null) {
                    throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
                }
                reportStats(acknowledgeCheckpoint.getCheckpointId(), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getCheckpointMetrics());
                if (this.recentPendingCheckpoints.contains(Long.valueOf(checkpointId))) {
                    z = true;
                    LOG.warn("Received late message for now expired checkpoint attempt {} from task {} of job {} at {}.", new Object[]{Long.valueOf(checkpointId), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob(), str});
                } else {
                    LOG.debug("Received message for an unknown checkpoint {} from task {} of job {} at {}.", new Object[]{Long.valueOf(checkpointId), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob(), str});
                    z = false;
                }
                discardSubtaskState(acknowledgeCheckpoint.getJob(), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getCheckpointId(), acknowledgeCheckpoint.getSubtaskState());
                return z;
            }
            switch (pendingCheckpoint.acknowledgeTask(acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getSubtaskState(), acknowledgeCheckpoint.getCheckpointMetrics())) {
                case SUCCESS:
                    LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.", new Object[]{Long.valueOf(checkpointId), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob(), str});
                    if (pendingCheckpoint.isFullyAcknowledged()) {
                        completePendingCheckpoint(pendingCheckpoint);
                        break;
                    }
                    break;
                case DUPLICATE:
                    LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}, location {}.", new Object[]{Long.valueOf(acknowledgeCheckpoint.getCheckpointId()), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob(), str});
                    break;
                case UNKNOWN:
                    LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, because the task's execution attempt id was unknown. Discarding the state handle to avoid lingering state.", new Object[]{Long.valueOf(acknowledgeCheckpoint.getCheckpointId()), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob(), str});
                    discardSubtaskState(acknowledgeCheckpoint.getJob(), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getCheckpointId(), acknowledgeCheckpoint.getSubtaskState());
                    break;
                case DISCARDED:
                    LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {} at {}, because the pending checkpoint had been discarded. Discarding the state handle tp avoid lingering state.", new Object[]{Long.valueOf(acknowledgeCheckpoint.getCheckpointId()), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob(), str});
                    discardSubtaskState(acknowledgeCheckpoint.getJob(), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getCheckpointId(), acknowledgeCheckpoint.getSubtaskState());
                    break;
            }
            return true;
        }
    }

    private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
        long checkpointID = pendingCheckpoint.getCheckpointID();
        CheckpointProperties props = pendingCheckpoint.getProps();
        this.completedCheckpointStore.getSharedStateRegistry().checkpointCompleted(checkpointID);
        try {
            try {
                CompletedCheckpoint finalizeCheckpoint = finalizeCheckpoint(pendingCheckpoint);
                Preconditions.checkState(pendingCheckpoint.isDisposed() && finalizeCheckpoint != null);
                CompletedCheckpoint addCompletedCheckpointToStoreAndSubsumeOldest = !props.isSavepoint() ? addCompletedCheckpointToStoreAndSubsumeOldest(checkpointID, finalizeCheckpoint, pendingCheckpoint) : null;
                pendingCheckpoint.getCompletionFuture().complete(finalizeCheckpoint);
                reportCompletedCheckpoint(finalizeCheckpoint);
                this.pendingCheckpoints.remove(Long.valueOf(checkpointID));
                scheduleTriggerRequest();
                cleanupAfterCompletedCheckpoint(pendingCheckpoint, checkpointID, finalizeCheckpoint, addCompletedCheckpointToStoreAndSubsumeOldest, props);
            } catch (Exception e) {
                pendingCheckpoint.getCompletionFuture().completeExceptionally(e);
                throw e;
            }
        } catch (Throwable th) {
            this.pendingCheckpoints.remove(Long.valueOf(checkpointID));
            scheduleTriggerRequest();
            throw th;
        }
    }

    private void reportCompletedCheckpoint(CompletedCheckpoint completedCheckpoint) {
        CompletedCheckpointStats statistic = completedCheckpoint.getStatistic();
        if (statistic != null) {
            Logger logger = LOG;
            Object[] objArr = new Object[3];
            objArr[0] = Long.valueOf(completedCheckpoint.getCheckpointID());
            objArr[1] = Long.valueOf(statistic.getStateSize() == 0 ? 0L : statistic.getStateSize() / 1024);
            objArr[2] = Long.valueOf(statistic.getEndToEndDuration());
            logger.trace("Checkpoint {} size: {}Kb, duration: {}ms", objArr);
            this.statsTracker.reportCompletedCheckpoint(statistic);
        }
    }

    private void cleanupAfterCompletedCheckpoint(PendingCheckpoint pendingCheckpoint, long j, CompletedCheckpoint completedCheckpoint, CompletedCheckpoint completedCheckpoint2, CheckpointProperties checkpointProperties) {
        rememberRecentCheckpointId(j);
        this.lastCheckpointCompletionRelativeTime = this.clock.relativeTimeMillis();
        logCheckpointInfo(completedCheckpoint);
        if (!checkpointProperties.isSavepoint() || checkpointProperties.isSynchronous()) {
            dropSubsumedCheckpoints(j);
            sendAcknowledgeMessages(pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), j, completedCheckpoint.getTimestamp(), extractIdIfDiscardedOnSubsumed(completedCheckpoint2));
        }
    }

    private void logCheckpointInfo(CompletedCheckpoint completedCheckpoint) {
        LOG.info("Completed checkpoint {} for job {} ({} bytes, checkpointDuration={} ms, finalizationTime={} ms).", new Object[]{Long.valueOf(completedCheckpoint.getCheckpointID()), this.job, Long.valueOf(completedCheckpoint.getStateSize()), Long.valueOf(completedCheckpoint.getCompletionTimestamp() - completedCheckpoint.getTimestamp()), Long.valueOf(System.currentTimeMillis() - completedCheckpoint.getCompletionTimestamp())});
        if (LOG.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder();
            sb.append("Checkpoint state: ");
            Iterator<OperatorState> it = completedCheckpoint.getOperatorStates().values().iterator();
            while (it.hasNext()) {
                sb.append(it.next());
                sb.append(", ");
            }
            sb.setLength(sb.length() - 2);
            LOG.debug(sb.toString());
        }
    }

    private CompletedCheckpoint finalizeCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
        try {
            CompletedCheckpoint finalizeCheckpoint = pendingCheckpoint.finalizeCheckpoint(this.checkpointsCleaner, this::scheduleTriggerRequest, this.executor);
            this.failureManager.handleCheckpointSuccess(pendingCheckpoint.getCheckpointID());
            return finalizeCheckpoint;
        } catch (Exception e) {
            CheckpointFailureReason checkpointFailureReason = e instanceof FinishedTaskStateProvider.PartialFinishingNotSupportedByStateException ? CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING : CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE;
            if (!pendingCheckpoint.isDisposed()) {
                abortPendingCheckpoint(pendingCheckpoint, new CheckpointException(checkpointFailureReason, e));
            }
            throw new CheckpointException("Could not finalize the pending checkpoint " + pendingCheckpoint.getCheckpointID() + '.', checkpointFailureReason, e);
        }
    }

    private long extractIdIfDiscardedOnSubsumed(CompletedCheckpoint completedCheckpoint) {
        return (completedCheckpoint == null || !completedCheckpoint.getProperties().discardOnSubsumed()) ? -1L : completedCheckpoint.getCheckpointID();
    }

    private CompletedCheckpoint addCompletedCheckpointToStoreAndSubsumeOldest(long j, CompletedCheckpoint completedCheckpoint, PendingCheckpoint pendingCheckpoint) throws CheckpointException {
        List<ExecutionVertex> tasksToCommitTo = pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo();
        try {
            CompletedCheckpoint addCheckpointAndSubsumeOldestOne = this.completedCheckpointStore.addCheckpointAndSubsumeOldestOne(completedCheckpoint, this.checkpointsCleaner, this::scheduleTriggerRequest);
            this.forceFullSnapshot = false;
            return addCheckpointAndSubsumeOldestOne;
        } catch (Exception e) {
            pendingCheckpoint.getCompletionFuture().completeExceptionally(e);
            if (e instanceof PossibleInconsistentStateException) {
                LOG.warn("An error occurred while writing checkpoint {} to the underlying metadata store. Flink was not able to determine whether the metadata was successfully persisted. The corresponding state located at '{}' won't be discarded and needs to be cleaned up manually.", Long.valueOf(completedCheckpoint.getCheckpointID()), completedCheckpoint.getExternalPointer());
            } else {
                this.checkpointsCleaner.cleanCheckpointOnFailedStoring(completedCheckpoint, this.executor);
            }
            reportFailedCheckpoint(j, e);
            sendAbortedMessages(tasksToCommitTo, j, completedCheckpoint.getTimestamp());
            throw new CheckpointException("Could not complete the pending checkpoint " + j + '.', CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE, e);
        }
    }

    private void reportFailedCheckpoint(long j, Exception exc) {
        PendingCheckpointStats pendingCheckpointStats = this.statsTracker.getPendingCheckpointStats(j);
        if (pendingCheckpointStats != null) {
            this.statsTracker.reportFailedCheckpoint(pendingCheckpointStats.toFailedCheckpoint(System.currentTimeMillis(), exc));
        }
    }

    void scheduleTriggerRequest() {
        synchronized (this.lock) {
            if (isShutdown()) {
                LOG.debug("Skip scheduling trigger request because the CheckpointCoordinator is shut down");
            } else {
                this.timer.execute(this::executeQueuedRequest);
            }
        }
    }

    @VisibleForTesting
    void sendAcknowledgeMessages(List<ExecutionVertex> list, long j, long j2, long j3) {
        Iterator<ExecutionVertex> it = list.iterator();
        while (it.hasNext()) {
            Execution currentExecutionAttempt = it.next().getCurrentExecutionAttempt();
            if (currentExecutionAttempt != null) {
                currentExecutionAttempt.notifyCheckpointOnComplete(j, j2, j3);
            }
        }
        Iterator<OperatorCoordinatorCheckpointContext> it2 = this.coordinatorsToCheckpoint.iterator();
        while (it2.hasNext()) {
            it2.next().notifyCheckpointComplete(j);
        }
    }

    private void sendAbortedMessages(List<ExecutionVertex> list, long j, long j2) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        long latestCheckpointId = this.completedCheckpointStore.getLatestCheckpointId();
        this.executor.execute(() -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Execution currentExecutionAttempt = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt();
                if (currentExecutionAttempt != null) {
                    currentExecutionAttempt.notifyCheckpointAborted(j, latestCheckpointId, j2);
                }
            }
        });
        Iterator<OperatorCoordinatorCheckpointContext> it = this.coordinatorsToCheckpoint.iterator();
        while (it.hasNext()) {
            it.next().notifyCheckpointAborted(j);
        }
    }

    public void failUnacknowledgedPendingCheckpointsFor(ExecutionAttemptID executionAttemptID, Throwable th) {
        synchronized (this.lock) {
            abortPendingCheckpoints(pendingCheckpoint -> {
                return !pendingCheckpoint.isAcknowledgedBy(executionAttemptID);
            }, new CheckpointException(CheckpointFailureReason.TASK_FAILURE, th));
        }
    }

    private void rememberRecentCheckpointId(long j) {
        if (this.recentPendingCheckpoints.size() >= 16) {
            this.recentPendingCheckpoints.removeFirst();
        }
        this.recentPendingCheckpoints.addLast(Long.valueOf(j));
    }

    private void dropSubsumedCheckpoints(long j) {
        abortPendingCheckpoints(pendingCheckpoint -> {
            return pendingCheckpoint.getCheckpointID() < j && pendingCheckpoint.canBeSubsumed();
        }, new CheckpointException(CheckpointFailureReason.CHECKPOINT_SUBSUMED));
    }

    public OptionalLong restoreLatestCheckpointedStateToSubtasks(Set<ExecutionJobVertex> set) throws Exception {
        return restoreLatestCheckpointedStateInternal(set, OperatorCoordinatorRestoreBehavior.SKIP, false, true, false);
    }

    public boolean restoreLatestCheckpointedStateToAll(Set<ExecutionJobVertex> set, boolean z) throws Exception {
        return restoreLatestCheckpointedStateInternal(set, OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET, false, z, false).isPresent();
    }

    public boolean restoreInitialCheckpointIfPresent(Set<ExecutionJobVertex> set) throws Exception {
        return restoreLatestCheckpointedStateInternal(set, OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, false, false, true).isPresent();
    }

    private OptionalLong restoreLatestCheckpointedStateInternal(Set<ExecutionJobVertex> set, OperatorCoordinatorRestoreBehavior operatorCoordinatorRestoreBehavior, boolean z, boolean z2, boolean z3) throws Exception {
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            CompletedCheckpoint latestCheckpoint = this.completedCheckpointStore.getLatestCheckpoint();
            if (latestCheckpoint == null) {
                LOG.info("No checkpoint found during restore.");
                if (z) {
                    throw new IllegalStateException("No completed checkpoint available");
                }
                LOG.debug("Resetting the master hooks.");
                MasterHooks.reset(this.masterHooks.values(), LOG);
                if (operatorCoordinatorRestoreBehavior == OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET) {
                    LOG.info("Resetting the Operator Coordinators to an empty state.");
                    restoreStateToCoordinators(-1L, Collections.emptyMap());
                }
                return OptionalLong.empty();
            }
            LOG.info("Restoring job {} from {}.", this.job, latestCheckpoint);
            this.forceFullSnapshot = latestCheckpoint.getProperties().isUnclaimed();
            Map<OperatorID, OperatorState> extractOperatorStates = extractOperatorStates(latestCheckpoint);
            if (z3) {
                this.vertexFinishedStateCheckerFactory.apply(set, extractOperatorStates).validateOperatorsFinishedState();
            }
            new StateAssignmentOperation(latestCheckpoint.getCheckpointID(), set, extractOperatorStates, z2).assignStates();
            MasterHooks.restoreMasterHooks(this.masterHooks, latestCheckpoint.getMasterHookStates(), latestCheckpoint.getCheckpointID(), z2, LOG);
            if (operatorCoordinatorRestoreBehavior != OperatorCoordinatorRestoreBehavior.SKIP) {
                restoreStateToCoordinators(latestCheckpoint.getCheckpointID(), extractOperatorStates);
            }
            this.statsTracker.reportRestoredCheckpoint(new RestoredCheckpointStats(latestCheckpoint.getCheckpointID(), latestCheckpoint.getProperties(), System.currentTimeMillis(), latestCheckpoint.getExternalPointer()));
            return OptionalLong.of(latestCheckpoint.getCheckpointID());
        }
    }

    private Map<OperatorID, OperatorState> extractOperatorStates(CompletedCheckpoint completedCheckpoint) {
        Map<OperatorID, OperatorState> operatorStates = completedCheckpoint.getOperatorStates();
        if (completedCheckpoint.getCheckpointID() != this.checkpointIdOfIgnoredInFlightData) {
            return operatorStates;
        }
        HashMap hashMap = new HashMap();
        for (OperatorState operatorState : operatorStates.values()) {
            hashMap.put(operatorState.getOperatorID(), operatorState.copyAndDiscardInFlightData());
        }
        return hashMap;
    }

    public boolean restoreSavepoint(SavepointRestoreSettings savepointRestoreSettings, Map<JobVertexID, ExecutionJobVertex> map, ClassLoader classLoader) throws Exception {
        CheckpointProperties forUnclaimedSnapshot;
        String restorePath = savepointRestoreSettings.getRestorePath();
        boolean allowNonRestoredState = savepointRestoreSettings.allowNonRestoredState();
        Preconditions.checkNotNull(restorePath, "The savepoint path cannot be null.");
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = this.job;
        objArr[1] = restorePath;
        objArr[2] = allowNonRestoredState ? "allowing non restored state" : "";
        logger.info("Starting job {} from savepoint {} ({})", objArr);
        CompletedCheckpointStorageLocation resolveCheckpoint = this.checkpointStorageView.resolveCheckpoint(restorePath);
        switch (savepointRestoreSettings.getRestoreMode()) {
            case CLAIM:
                forUnclaimedSnapshot = this.checkpointProperties;
                break;
            case LEGACY:
                forUnclaimedSnapshot = CheckpointProperties.forSavepoint(false, SavepointFormatType.CANONICAL);
                break;
            case NO_CLAIM:
                forUnclaimedSnapshot = CheckpointProperties.forUnclaimedSnapshot();
                break;
            default:
                throw new IllegalArgumentException("Unknown snapshot restore mode");
        }
        CompletedCheckpoint loadAndValidateCheckpoint = Checkpoints.loadAndValidateCheckpoint(this.job, map, resolveCheckpoint, classLoader, allowNonRestoredState, forUnclaimedSnapshot);
        loadAndValidateCheckpoint.registerSharedStatesAfterRestored(this.completedCheckpointStore.getSharedStateRegistry(), savepointRestoreSettings.getRestoreMode());
        this.completedCheckpointStore.addCheckpointAndSubsumeOldestOne(loadAndValidateCheckpoint, this.checkpointsCleaner, this::scheduleTriggerRequest);
        long checkpointID = loadAndValidateCheckpoint.getCheckpointID() + 1;
        this.checkpointIdCounter.setCount(checkpointID);
        LOG.info("Reset the checkpoint ID of job {} to {}.", this.job, Long.valueOf(checkpointID));
        return restoreLatestCheckpointedStateInternal(new HashSet(map.values()), OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, true, allowNonRestoredState, true).isPresent();
    }

    public int getNumberOfPendingCheckpoints() {
        int size;
        synchronized (this.lock) {
            size = this.pendingCheckpoints.size();
        }
        return size;
    }

    public int getNumberOfRetainedSuccessfulCheckpoints() {
        int numberOfRetainedCheckpoints;
        synchronized (this.lock) {
            numberOfRetainedCheckpoints = this.completedCheckpointStore.getNumberOfRetainedCheckpoints();
        }
        return numberOfRetainedCheckpoints;
    }

    public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
        HashMap hashMap;
        synchronized (this.lock) {
            hashMap = new HashMap(this.pendingCheckpoints);
        }
        return hashMap;
    }

    public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
        List<CompletedCheckpoint> allCheckpoints;
        synchronized (this.lock) {
            allCheckpoints = this.completedCheckpointStore.getAllCheckpoints();
        }
        return allCheckpoints;
    }

    public CheckpointStorageCoordinatorView getCheckpointStorage() {
        return this.checkpointStorageView;
    }

    public CompletedCheckpointStore getCheckpointStore() {
        return this.completedCheckpointStore;
    }

    public long getCheckpointTimeout() {
        return this.checkpointTimeout;
    }

    @VisibleForTesting
    @Deprecated
    PriorityQueue<CheckpointTriggerRequest> getTriggerRequestQueue() {
        PriorityQueue<CheckpointTriggerRequest> triggerRequestQueue;
        synchronized (this.lock) {
            triggerRequestQueue = this.requestDecider.getTriggerRequestQueue();
        }
        return triggerRequestQueue;
    }

    public boolean isTriggering() {
        return this.isTriggering;
    }

    @VisibleForTesting
    boolean isCurrentPeriodicTriggerAvailable() {
        return this.currentPeriodicTrigger != null;
    }

    public boolean isPeriodicCheckpointingConfigured() {
        return this.baseInterval != CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT;
    }

    public void startCheckpointScheduler() {
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            Preconditions.checkState(isPeriodicCheckpointingConfigured(), "Can not start checkpoint scheduler, if no periodic checkpointing is configured");
            stopCheckpointScheduler();
            this.periodicScheduling = true;
            this.currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());
        }
    }

    public void stopCheckpointScheduler() {
        synchronized (this.lock) {
            this.periodicScheduling = false;
            cancelPeriodicTrigger();
            abortPendingAndQueuedCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SUSPEND));
        }
    }

    public boolean isPeriodicCheckpointingStarted() {
        return this.periodicScheduling;
    }

    public void abortPendingCheckpoints(CheckpointException checkpointException) {
        synchronized (this.lock) {
            abortPendingCheckpoints(pendingCheckpoint -> {
                return true;
            }, checkpointException);
        }
    }

    private void abortPendingCheckpoints(Predicate<PendingCheckpoint> predicate, CheckpointException checkpointException) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        for (PendingCheckpoint pendingCheckpoint : (PendingCheckpoint[]) this.pendingCheckpoints.values().stream().filter(predicate).toArray(i -> {
            return new PendingCheckpoint[i];
        })) {
            abortPendingCheckpoint(pendingCheckpoint, checkpointException);
        }
    }

    private void rescheduleTrigger(long j) {
        cancelPeriodicTrigger();
        this.currentPeriodicTrigger = scheduleTriggerWithDelay(j);
    }

    private void cancelPeriodicTrigger() {
        if (this.currentPeriodicTrigger != null) {
            this.currentPeriodicTrigger.cancel(false);
            this.currentPeriodicTrigger = null;
        }
    }

    private long getRandomInitDelay() {
        return ThreadLocalRandom.current().nextLong(this.minPauseBetweenCheckpoints, this.baseInterval + 1);
    }

    private ScheduledFuture<?> scheduleTriggerWithDelay(long j) {
        return this.timer.scheduleAtFixedRate(new ScheduledTrigger(), j, this.baseInterval, TimeUnit.MILLISECONDS);
    }

    private void restoreStateToCoordinators(long j, Map<OperatorID, OperatorState> map) throws Exception {
        for (OperatorCoordinatorCheckpointContext operatorCoordinatorCheckpointContext : this.coordinatorsToCheckpoint) {
            OperatorState operatorState = map.get(operatorCoordinatorCheckpointContext.operatorId());
            ByteStreamStateHandle coordinatorState = operatorState == null ? null : operatorState.getCoordinatorState();
            operatorCoordinatorCheckpointContext.resetToCheckpoint(j, coordinatorState == null ? null : coordinatorState.getData());
        }
    }

    public JobStatusListener createActivatorDeactivator() {
        JobStatusListener jobStatusListener;
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (this.jobStatusListener == null) {
                this.jobStatusListener = new CheckpointCoordinatorDeActivator(this);
            }
            jobStatusListener = this.jobStatusListener;
        }
        return jobStatusListener;
    }

    int getNumQueuedRequests() {
        int numQueuedRequests;
        synchronized (this.lock) {
            numQueuedRequests = this.requestDecider.getNumQueuedRequests();
        }
        return numQueuedRequests;
    }

    public void reportStats(long j, ExecutionAttemptID executionAttemptID, CheckpointMetrics checkpointMetrics) throws CheckpointException {
        this.attemptMappingProvider.getVertex(executionAttemptID).ifPresent(executionVertex -> {
            this.statsTracker.reportIncompleteStats(j, executionVertex, checkpointMetrics);
        });
    }

    private void discardSubtaskState(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long j, final TaskStateSnapshot taskStateSnapshot) {
        if (taskStateSnapshot != null) {
            this.executor.execute(new Runnable() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinator.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        taskStateSnapshot.discardState();
                    } catch (Throwable th) {
                        CheckpointCoordinator.LOG.warn("Could not properly discard state object of checkpoint {} belonging to task {} of job {}.", new Object[]{Long.valueOf(j), executionAttemptID, jobID, th});
                    }
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void abortPendingCheckpoint(PendingCheckpoint pendingCheckpoint, CheckpointException checkpointException) {
        abortPendingCheckpoint(pendingCheckpoint, checkpointException, null);
    }

    private void abortPendingCheckpoint(PendingCheckpoint pendingCheckpoint, CheckpointException checkpointException, @Nullable ExecutionAttemptID executionAttemptID) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        if (pendingCheckpoint.isDisposed()) {
            return;
        }
        try {
            pendingCheckpoint.abort(checkpointException.getCheckpointFailureReason(), checkpointException.getCause(), this.checkpointsCleaner, this::scheduleTriggerRequest, this.executor, this.statsTracker);
            this.failureManager.handleCheckpointException(pendingCheckpoint, pendingCheckpoint.getProps(), checkpointException, executionAttemptID, this.job, getStatsCallback(pendingCheckpoint), this.statsTracker);
            sendAbortedMessages(pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), pendingCheckpoint.getCheckpointID(), pendingCheckpoint.getCheckpointTimestamp());
            this.pendingCheckpoints.remove(Long.valueOf(pendingCheckpoint.getCheckpointID()));
            rememberRecentCheckpointId(pendingCheckpoint.getCheckpointID());
            scheduleTriggerRequest();
        } catch (Throwable th) {
            sendAbortedMessages(pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), pendingCheckpoint.getCheckpointID(), pendingCheckpoint.getCheckpointTimestamp());
            this.pendingCheckpoints.remove(Long.valueOf(pendingCheckpoint.getCheckpointID()));
            rememberRecentCheckpointId(pendingCheckpoint.getCheckpointID());
            scheduleTriggerRequest();
            throw th;
        }
    }

    private void preCheckGlobalState(boolean z) throws CheckpointException {
        if (this.shutdown) {
            throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
        }
        if (z && !this.periodicScheduling) {
            throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
        }
    }

    private void abortPendingAndQueuedCheckpoints(CheckpointException checkpointException) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        this.requestDecider.abortAll(checkpointException);
        abortPendingCheckpoints(checkpointException);
    }

    private static CheckpointException getCheckpointException(CheckpointFailureReason checkpointFailureReason, Throwable th) {
        return ExceptionUtils.findThrowable(th, IOException.class).isPresent() ? new CheckpointException(CheckpointFailureReason.IO_EXCEPTION, th) : (CheckpointException) ExceptionUtils.findThrowable(th, CheckpointException.class).orElseGet(() -> {
            return new CheckpointException(checkpointFailureReason, th);
        });
    }

    private PendingCheckpointStats trackPendingCheckpointStats(long j, CheckpointPlan checkpointPlan, CheckpointProperties checkpointProperties, long j2) {
        PendingCheckpointStats reportPendingCheckpoint = this.statsTracker.reportPendingCheckpoint(j, j2, checkpointProperties, (Map) Stream.concat(checkpointPlan.getTasksToWaitFor().stream(), checkpointPlan.getFinishedTasks().stream()).map((v0) -> {
            return v0.getVertex();
        }).map((v0) -> {
            return v0.getJobVertex();
        }).distinct().collect(Collectors.toMap((v0) -> {
            return v0.getJobVertexId();
        }, (v0) -> {
            return v0.getParallelism();
        })));
        reportFinishedTasks(reportPendingCheckpoint, checkpointPlan.getFinishedTasks());
        return reportPendingCheckpoint;
    }

    private void reportFinishedTasks(PendingCheckpointStats pendingCheckpointStats, List<Execution> list) {
        long currentTimeMillis = System.currentTimeMillis();
        list.forEach(execution -> {
            pendingCheckpointStats.reportSubtaskStats(execution.getVertex().getJobvertexId(), new SubtaskStateStats(execution.getParallelSubtaskIndex(), currentTimeMillis));
        });
    }

    @Nullable
    private PendingCheckpointStats getStatsCallback(PendingCheckpoint pendingCheckpoint) {
        return this.statsTracker.getPendingCheckpointStats(pendingCheckpoint.getCheckpointID());
    }

    static {
        $assertionsDisabled = !CheckpointCoordinator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    }
}
