/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverRegion {
    private static final AtomicReferenceFieldUpdater<FailoverRegion, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state");
    private static final Logger LOG = LoggerFactory.getLogger(FailoverRegion.class);
    private final AbstractID id = new AbstractID();
    private final ExecutionGraph executionGraph;
    private final List<ExecutionVertex> connectedExecutionVertexes;
    private final Map<JobVertexID, ExecutionJobVertex> tasks;
    private volatile JobStatus state = JobStatus.RUNNING;

    public FailoverRegion(ExecutionGraph executionGraph, List<ExecutionVertex> connectedExecutions, Map<JobVertexID, ExecutionJobVertex> tasks) {
        this.executionGraph = (ExecutionGraph)Preconditions.checkNotNull((Object)executionGraph);
        this.connectedExecutionVertexes = (List)Preconditions.checkNotNull(connectedExecutions);
        this.tasks = (Map)Preconditions.checkNotNull(tasks);
        LOG.debug("Created failover region {} with vertices: {}", (Object)this.id, connectedExecutions);
    }

    public void onExecutionFail(Execution taskExecution, Throwable cause) {
        if (!this.executionGraph.getRestartStrategy().canRestart()) {
            this.executionGraph.failGlobal(cause);
        } else {
            this.cancel(taskExecution.getGlobalModVersion());
        }
    }

    private void allVerticesInTerminalState(long globalModVersionOfFailover) {
        block1: {
            JobStatus curStatus;
            while ((curStatus = this.state).equals((Object)JobStatus.CANCELLING)) {
                if (!this.transitionState(curStatus, JobStatus.CANCELED)) continue;
                this.reset(globalModVersionOfFailover);
                break block1;
            }
            LOG.info("FailoverRegion {} is {} when allVerticesInTerminalState.", (Object)this.id, (Object)this.state);
        }
    }

    public JobStatus getState() {
        return this.state;
    }

    private void failover(long globalModVersionOfFailover) {
        if (!this.executionGraph.getRestartStrategy().canRestart()) {
            this.executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail"));
        } else {
            JobStatus curStatus = this.state;
            if (curStatus.equals((Object)JobStatus.RUNNING)) {
                this.cancel(globalModVersionOfFailover);
            } else if (curStatus.equals((Object)JobStatus.CANCELED)) {
                this.reset(globalModVersionOfFailover);
            } else {
                LOG.info("FailoverRegion {} is {} when notified to failover.", (Object)this.id, (Object)this.state);
            }
        }
    }

    private void cancel(long globalModVersionOfFailover) {
        block1: {
            JobStatus curStatus;
            this.executionGraph.getJobMasterMainThreadExecutor().assertRunningInMainThread();
            while ((curStatus = this.state).equals((Object)JobStatus.RUNNING)) {
                if (!this.transitionState(curStatus, JobStatus.CANCELLING)) continue;
                this.createTerminationFutureOverAllConnectedVertexes().thenAccept(nullptr -> this.allVerticesInTerminalState(globalModVersionOfFailover));
                break block1;
            }
            LOG.info("FailoverRegion {} is {} when cancel.", (Object)this.id, (Object)this.state);
        }
    }

    @VisibleForTesting
    protected CompletableFuture<Void> createTerminationFutureOverAllConnectedVertexes() {
        ArrayList futures = new ArrayList(this.connectedExecutionVertexes.size());
        for (ExecutionVertex vertex : this.connectedExecutionVertexes) {
            futures.add(vertex.cancel());
        }
        return FutureUtils.waitForAll(futures);
    }

    private void reset(long globalModVersionOfFailover) {
        try {
            HashSet<CoLocationGroup> colGroups = new HashSet<CoLocationGroup>();
            long restartTimestamp = System.currentTimeMillis();
            for (ExecutionVertex ev : this.connectedExecutionVertexes) {
                CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup();
                if (cgroup != null && !colGroups.contains(cgroup)) {
                    cgroup.resetConstraints();
                    colGroups.add(cgroup);
                }
                ev.resetForNewExecution(restartTimestamp, globalModVersionOfFailover);
            }
            if (this.transitionState(JobStatus.CANCELED, JobStatus.CREATED)) {
                this.restart(globalModVersionOfFailover);
            } else {
                LOG.info("FailoverRegion {} switched from CANCELLING to CREATED fail, will fail this region again.", (Object)this.id);
                this.failover(globalModVersionOfFailover);
            }
        }
        catch (GlobalModVersionMismatch e) {
            this.state = JobStatus.RUNNING;
        }
        catch (Throwable e) {
            LOG.info("FailoverRegion {} reset fail, will failover again.", (Object)this.id);
            this.failover(globalModVersionOfFailover);
        }
    }

    private void restart(long globalModVersionOfFailover) {
        try {
            if (this.transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
                if (this.executionGraph.getCheckpointCoordinator() != null) {
                    this.executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
                    this.executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(this.tasks, false, true);
                }
                HashSet<AllocationID> previousAllocationsInRegion = new HashSet<AllocationID>(this.connectedExecutionVertexes.size());
                for (ExecutionVertex connectedExecutionVertex : this.connectedExecutionVertexes) {
                    AllocationID latestPriorAllocation = connectedExecutionVertex.getLatestPriorAllocation();
                    if (latestPriorAllocation == null) continue;
                    previousAllocationsInRegion.add(latestPriorAllocation);
                }
                for (ExecutionVertex ev : this.connectedExecutionVertexes) {
                    try {
                        ev.scheduleForExecution(this.executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ANY, previousAllocationsInRegion);
                    }
                    catch (Throwable e) {
                        this.failover(globalModVersionOfFailover);
                    }
                }
            } else {
                LOG.info("FailoverRegion {} switched from CREATED to RUNNING fail, will fail this region again.", (Object)this.id);
                this.failover(globalModVersionOfFailover);
            }
        }
        catch (Exception e) {
            LOG.info("FailoverRegion {} restart failed, failover again.", (Object)this.id, (Object)e);
            this.failover(globalModVersionOfFailover);
        }
    }

    private boolean transitionState(JobStatus current, JobStatus newState) {
        if (STATE_UPDATER.compareAndSet(this, current, newState)) {
            LOG.info("FailoverRegion {} switched from state {} to {}.", new Object[]{this.id, current, newState});
            return true;
        }
        return false;
    }
}

