/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CheckpointFailureManager {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointFailureManager.class);
    public static final int UNLIMITED_TOLERABLE_FAILURE_NUMBER = Integer.MAX_VALUE;
    public static final String EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE = "Exceeded checkpoint tolerable failure threshold.";
    private static final int UNKNOWN_CHECKPOINT_ID = -1;
    private final int tolerableCpFailureNumber;
    private final FailJobCallback failureCallback;
    private final AtomicInteger continuousFailureCounter;
    private final Set<Long> countedCheckpointIds;
    private long lastSucceededCheckpointId = Long.MIN_VALUE;

    public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) {
        Preconditions.checkArgument(tolerableCpFailureNumber >= 0, "The tolerable checkpoint failure number is illegal, it must be greater than or equal to 0 .");
        this.tolerableCpFailureNumber = tolerableCpFailureNumber;
        this.continuousFailureCounter = new AtomicInteger(0);
        this.failureCallback = Preconditions.checkNotNull(failureCallback);
        this.countedCheckpointIds = ConcurrentHashMap.newKeySet();
    }

    public void handleCheckpointException(@Nullable PendingCheckpoint pendingCheckpoint, CheckpointProperties checkpointProperties, CheckpointException exception, @Nullable ExecutionAttemptID executionAttemptID, JobID job, @Nullable PendingCheckpointStats pendingCheckpointStats, CheckpointStatsTracker statsTracker) {
        long checkpointId = pendingCheckpoint == null ? -1L : pendingCheckpoint.getCheckpointID();
        this.updateStatsAfterCheckpointFailed(pendingCheckpointStats, statsTracker, exception);
        if (CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING.equals((Object)exception.getCheckpointFailureReason())) {
            LOG.info("Failed to trigger checkpoint for job {} since {}.", (Object)job, (Object)exception.getMessage());
        } else {
            LOG.warn("Failed to trigger or complete checkpoint {} for job {}. ({} consecutive failed attempts so far)", new Object[]{checkpointId == -1L ? "UNKNOWN_CHECKPOINT_ID" : Long.valueOf(checkpointId), job, this.continuousFailureCounter.get(), exception});
        }
        if (this.isJobManagerFailure(exception, executionAttemptID)) {
            this.handleJobLevelCheckpointException(checkpointProperties, exception, checkpointId);
        } else {
            this.handleTaskLevelCheckpointException(Preconditions.checkNotNull(pendingCheckpoint), exception, Preconditions.checkNotNull(executionAttemptID));
        }
    }

    private void updateStatsAfterCheckpointFailed(@Nullable PendingCheckpointStats pendingCheckpointStats, CheckpointStatsTracker statsTracker, CheckpointException exception) {
        if (pendingCheckpointStats != null) {
            long failureTimestamp = System.currentTimeMillis();
            statsTracker.reportFailedCheckpoint(pendingCheckpointStats.toFailedCheckpoint(failureTimestamp, exception));
        } else {
            statsTracker.reportFailedCheckpointsWithoutInProgress();
        }
    }

    private boolean isJobManagerFailure(CheckpointException exception, @Nullable ExecutionAttemptID executionAttemptID) {
        return CheckpointFailureManager.isPreFlightFailure(exception) || executionAttemptID == null;
    }

    void handleJobLevelCheckpointException(CheckpointProperties checkpointProperties, CheckpointException exception, long checkpointId) {
        if (!checkpointProperties.isSavepoint()) {
            this.checkFailureAgainstCounter(exception, checkpointId, this.failureCallback::failJob);
        }
    }

    void handleTaskLevelCheckpointException(PendingCheckpoint pendingCheckpoint, CheckpointException exception, ExecutionAttemptID executionAttemptID) {
        CheckpointProperties checkpointProps = pendingCheckpoint.getProps();
        if (checkpointProps.isSavepoint() && checkpointProps.isSynchronous()) {
            this.failureCallback.failJob(exception);
        } else {
            this.checkFailureAgainstCounter(exception, pendingCheckpoint.getCheckpointID(), e -> this.failureCallback.failJobDueToTaskFailure((Throwable)e, executionAttemptID));
        }
    }

    private void checkFailureAgainstCounter(CheckpointException exception, long checkpointId, Consumer<FlinkRuntimeException> errorHandler) {
        if (checkpointId == -1L || checkpointId > this.lastSucceededCheckpointId) {
            this.checkFailureCounter(exception, checkpointId);
            if (this.continuousFailureCounter.get() > this.tolerableCpFailureNumber) {
                this.clearCount();
                String exceptionMessage = String.format("%s The latest checkpoint failed due to %s, view the Checkpoint History tab or the Job Manager log to find out why continuous checkpoints failed.", EXCEEDED_CHECKPOINT_TOLERABLE_FAILURE_MESSAGE, exception.getCheckpointFailureReason().message());
                errorHandler.accept(new FlinkRuntimeException(exceptionMessage));
            }
        }
    }

    public void checkFailureCounter(CheckpointException exception, long checkpointId) {
        if (this.tolerableCpFailureNumber == Integer.MAX_VALUE) {
            return;
        }
        CheckpointFailureReason reason = exception.getCheckpointFailureReason();
        switch (reason) {
            case PERIODIC_SCHEDULER_SHUTDOWN: 
            case TOO_MANY_CHECKPOINT_REQUESTS: 
            case MINIMUM_TIME_BETWEEN_CHECKPOINTS: 
            case NOT_ALL_REQUIRED_TASKS_RUNNING: 
            case CHECKPOINT_SUBSUMED: 
            case CHECKPOINT_COORDINATOR_SUSPEND: 
            case CHECKPOINT_COORDINATOR_SHUTDOWN: 
            case CHANNEL_STATE_SHARED_STREAM_EXCEPTION: 
            case JOB_FAILOVER_REGION: 
            case CHECKPOINT_DECLINED_TASK_NOT_READY: 
            case CHECKPOINT_DECLINED_TASK_CLOSING: 
            case CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER: 
            case CHECKPOINT_DECLINED_SUBSUMED: 
            case CHECKPOINT_DECLINED_INPUT_END_OF_STREAM: 
            case TASK_FAILURE: 
            case TASK_CHECKPOINT_FAILURE: 
            case UNKNOWN_TASK_CHECKPOINT_NOTIFICATION_FAILURE: 
            case TRIGGER_CHECKPOINT_FAILURE: {
                break;
            }
            case IO_EXCEPTION: 
            case CHECKPOINT_ASYNC_EXCEPTION: 
            case CHECKPOINT_DECLINED: 
            case CHECKPOINT_EXPIRED: 
            case FINALIZE_CHECKPOINT_FAILURE: {
                if (checkpointId != -1L && !this.countedCheckpointIds.add(checkpointId)) break;
                this.continuousFailureCounter.incrementAndGet();
                break;
            }
            default: {
                throw new FlinkRuntimeException("Unknown checkpoint failure reason : " + reason.name());
            }
        }
    }

    public void handleCheckpointSuccess(long checkpointId) {
        if (checkpointId > this.lastSucceededCheckpointId) {
            this.lastSucceededCheckpointId = checkpointId;
            this.clearCount();
        }
    }

    private void clearCount() {
        this.continuousFailureCounter.set(0);
        this.countedCheckpointIds.clear();
    }

    private static boolean isPreFlightFailure(Throwable cause) {
        return ExceptionUtils.findThrowable(cause, CheckpointException.class).map(CheckpointException::getCheckpointFailureReason).map(CheckpointFailureReason::isPreFlight).orElse(false);
    }

    public static interface FailJobCallback {
        public void failJob(Throwable var1);

        public void failJobDueToTaskFailure(Throwable var1, ExecutionAttemptID var2);
    }
}

