/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.InternalExecutionGraphAccessor;
import org.apache.flink.runtime.executiongraph.JobVertexInputInfo;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexInitializedEvent;
import org.apache.flink.runtime.jobmaster.event.ExecutionVertexFinishedEvent;
import org.apache.flink.runtime.jobmaster.event.ExecutionVertexResetEvent;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
import org.apache.flink.runtime.jobmaster.event.JobEventManager;
import org.apache.flink.runtime.jobmaster.event.JobEventReplayHandler;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryContext;
import org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.shuffle.DefaultShuffleMasterSnapshotContext;
import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMasterSnapshot;
import org.apache.flink.shaded.guava32.com.google.common.collect.Sets;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultBatchJobRecoveryHandler
implements BatchJobRecoveryHandler,
JobEventReplayHandler {
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final JobEventManager jobEventManager;
    private final JobID jobId;
    private BatchJobRecoveryContext context;
    private long lastSnapshotRelativeTime;
    private final Set<JobVertexID> needToSnapshotJobVertices = new HashSet<JobVertexID>();
    private static final ResourceID UNKNOWN_PRODUCER = ResourceID.generate();
    private final long snapshotMinPauseMills;
    private Clock clock;
    private final Map<ExecutionVertexID, ExecutionVertexFinishedEvent> executionVertexFinishedEventMap = new LinkedHashMap<ExecutionVertexID, ExecutionVertexFinishedEvent>();
    private final List<ExecutionJobVertexInitializedEvent> jobVertexInitializedEvents = new ArrayList<ExecutionJobVertexInitializedEvent>();
    private final Set<JobVertexID> jobVerticesWithUnRecoveredCoordinators = new HashSet<JobVertexID>();
    private final Duration previousWorkerRecoveryTimeout;

    public DefaultBatchJobRecoveryHandler(JobEventManager jobEventManager, Configuration jobMasterConfiguration, JobID jobId) {
        this.jobEventManager = jobEventManager;
        this.jobId = Preconditions.checkNotNull(jobId);
        this.previousWorkerRecoveryTimeout = jobMasterConfiguration.get(BatchExecutionOptions.JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT);
        this.snapshotMinPauseMills = jobMasterConfiguration.get(BatchExecutionOptions.JOB_RECOVERY_SNAPSHOT_MIN_PAUSE).toMillis();
    }

    @Override
    public void initialize(BatchJobRecoveryContext context) {
        this.context = Preconditions.checkNotNull(context);
        this.clock = SystemClock.getInstance();
        try {
            this.jobEventManager.start();
        }
        catch (Throwable throwable) {
            context.failJob(throwable, System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
        }
    }

    @Override
    public void stop(boolean cleanUp) {
        this.jobEventManager.stop(cleanUp);
    }

    @Override
    public void startRecovering() {
        this.context.getMainThreadExecutor().assertRunningInMainThread();
        this.startRecoveringInternal();
        this.context.getShuffleMaster().notifyPartitionRecoveryStarted(this.context.getExecutionGraph().getJobID());
        if (!this.jobEventManager.replay(this)) {
            this.log.warn("Fail to replay log for {}, will start the job as a new one.", (Object)this.context.getExecutionGraph().getJobID());
            this.recoverFailed();
            return;
        }
        this.log.info("Replay all job events successfully.");
        this.recoverPartitions().whenComplete((ignored, throwable) -> {
            if (throwable != null) {
                this.recoverFailed();
            }
            try {
                this.recoverFinished();
            }
            catch (Exception exception) {
                this.recoverFailed();
            }
        });
    }

    @Override
    public boolean needRecover() {
        try {
            return this.jobEventManager.hasJobEvents();
        }
        catch (Throwable throwable) {
            this.context.failJob(throwable, System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            return false;
        }
    }

    @Override
    public boolean isRecovering() {
        return this.context.getExecutionGraph().getState() == JobStatus.RECONCILING;
    }

    private void restoreShuffleMaster(List<ShuffleMasterSnapshot> snapshots) {
        Preconditions.checkState(this.context.getShuffleMaster().supportsBatchSnapshot());
        this.context.getShuffleMaster().restoreState(snapshots, this.jobId);
    }

    private void startRecoveringInternal() {
        this.log.info("Try to recover status from previously failed job master.");
        this.context.getExecutionGraph().transitionState(JobStatus.CREATED, JobStatus.RECONCILING);
    }

    private void restoreOperatorCoordinators(Map<OperatorID, byte[]> snapshots, Map<OperatorID, JobVertexID> operatorToJobVertex) throws Exception {
        for (Map.Entry<OperatorID, byte[]> entry : snapshots.entrySet()) {
            OperatorID operatorId = entry.getKey();
            JobVertexID jobVertexId = Preconditions.checkNotNull(operatorToJobVertex.get(operatorId));
            ExecutionJobVertex jobVertex = this.getExecutionJobVertex(jobVertexId);
            this.log.info("Restore operator coordinators of {} from job event.", (Object)jobVertex.getName());
            for (OperatorCoordinatorHolder holder : jobVertex.getOperatorCoordinators()) {
                if (!holder.coordinator().supportsBatchSnapshot()) continue;
                byte[] snapshot = snapshots.get(holder.operatorId());
                holder.resetToCheckpoint(-1L, snapshot);
            }
        }
        this.determineVerticesForResetAfterRestoreOpCoordinator();
    }

    @Override
    public void startReplay() {
    }

    @Override
    public void replayOneEvent(JobEvent jobEvent) {
        if (jobEvent instanceof ExecutionVertexFinishedEvent) {
            ExecutionVertexFinishedEvent event = (ExecutionVertexFinishedEvent)jobEvent;
            this.executionVertexFinishedEventMap.put(event.getExecutionVertexId(), event);
        } else if (jobEvent instanceof ExecutionVertexResetEvent) {
            ExecutionVertexResetEvent event = (ExecutionVertexResetEvent)jobEvent;
            for (ExecutionVertexID executionVertexId : event.getExecutionVertexIds()) {
                this.executionVertexFinishedEventMap.remove(executionVertexId);
            }
        } else if (jobEvent instanceof ExecutionJobVertexInitializedEvent) {
            this.jobVertexInitializedEvents.add((ExecutionJobVertexInitializedEvent)jobEvent);
        } else {
            throw new IllegalStateException("Unsupported job event " + jobEvent);
        }
    }

    @Override
    public void finalizeReplay() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList<ExecutionJobVertex> initializedJobVertices = new ArrayList<ExecutionJobVertex>();
        for (ExecutionJobVertexInitializedEvent event : this.jobVertexInitializedEvents) {
            ExecutionJobVertex jobVertex = this.getExecutionJobVertex(event.getJobVertexId());
            this.context.initializeJobVertex(jobVertex, event.getParallelism(), event.getJobVertexInputInfos(), currentTimeMillis);
            initializedJobVertices.add(jobVertex);
        }
        this.context.updateTopology(initializedJobVertices);
        LinkedList<ExecutionVertexFinishedEvent> finishedEvents = new LinkedList<ExecutionVertexFinishedEvent>(this.executionVertexFinishedEventMap.values());
        while (!finishedEvents.isEmpty() && !finishedEvents.getLast().hasOperatorCoordinatorAndShuffleMasterSnapshots()) {
            finishedEvents.removeLast();
        }
        if (finishedEvents.isEmpty()) {
            return;
        }
        HashMap<OperatorID, byte[]> operatorCoordinatorSnapshots = new HashMap<OperatorID, byte[]>();
        List<ShuffleMasterSnapshot> shuffleMasterSnapshots = new ArrayList<ShuffleMasterSnapshot>();
        for (ExecutionVertexFinishedEvent executionVertexFinishedEvent : finishedEvents) {
            JobVertexID jobVertexId = executionVertexFinishedEvent.getExecutionVertexId().getJobVertexId();
            ExecutionJobVertex jobVertex = this.context.getExecutionGraph().getJobVertex(jobVertexId);
            Preconditions.checkState(jobVertex.isInitialized());
            int subTaskIndex = executionVertexFinishedEvent.getExecutionVertexId().getSubtaskIndex();
            Execution execution = jobVertex.getTaskVertices()[subTaskIndex].getCurrentExecutionAttempt();
            execution.recoverExecution(executionVertexFinishedEvent.getExecutionAttemptId(), executionVertexFinishedEvent.getTaskManagerLocation(), executionVertexFinishedEvent.getUserAccumulators(), executionVertexFinishedEvent.getIOMetrics());
            for (Map.Entry<OperatorID, CompletableFuture<byte[]>> entry : executionVertexFinishedEvent.getOperatorCoordinatorSnapshotFutures().entrySet()) {
                Preconditions.checkState(entry.getValue().isDone());
                operatorCoordinatorSnapshots.put(entry.getKey(), entry.getValue().get());
            }
            if (executionVertexFinishedEvent.getShuffleMasterSnapshotFuture() == null) continue;
            Preconditions.checkState(executionVertexFinishedEvent.getShuffleMasterSnapshotFuture().isDone());
            ShuffleMasterSnapshot shuffleMasterSnapshot = executionVertexFinishedEvent.getShuffleMasterSnapshotFuture().get();
            if (shuffleMasterSnapshot.isIncremental()) {
                shuffleMasterSnapshots.add(shuffleMasterSnapshot);
                continue;
            }
            shuffleMasterSnapshots = Arrays.asList(shuffleMasterSnapshot);
        }
        HashMap<OperatorID, JobVertexID> operatorToJobVertex = new HashMap<OperatorID, JobVertexID>();
        for (ExecutionJobVertex jobVertex : this.context.getExecutionGraph().getAllVertices().values()) {
            if (!jobVertex.isInitialized()) continue;
            for (OperatorCoordinatorHolder holder : jobVertex.getOperatorCoordinators()) {
                operatorToJobVertex.put(holder.operatorId(), jobVertex.getJobVertexId());
            }
        }
        try {
            this.restoreOperatorCoordinators(operatorCoordinatorSnapshots, operatorToJobVertex);
        }
        catch (Exception exception) {
            this.log.warn("Restore coordinator operator failed.", (Throwable)exception);
            throw exception;
        }
        this.restoreShuffleMaster(shuffleMasterSnapshots);
    }

    @Override
    public void onExecutionVertexReset(Collection<ExecutionVertexID> vertices) {
        Preconditions.checkState(!this.isRecovering());
        this.jobEventManager.writeEvent(new ExecutionVertexResetEvent(new ArrayList<ExecutionVertexID>(vertices)), false);
    }

    @Override
    public void onExecutionJobVertexInitialization(JobVertexID jobVertexId, int parallelism, Map<IntermediateDataSetID, JobVertexInputInfo> jobVertexInputInfos) {
        Preconditions.checkState(!this.isRecovering());
        this.jobEventManager.writeEvent(new ExecutionJobVertexInitializedEvent(jobVertexId, parallelism, jobVertexInputInfos), false);
    }

    @Override
    public void onExecutionFinished(ExecutionVertexID executionVertexId) {
        Preconditions.checkState(!this.isRecovering());
        Execution execution = this.getExecutionVertex(executionVertexId).getCurrentExecutionAttempt();
        ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex();
        boolean jobVertexFinished = jobVertex.getAggregateState() == ExecutionState.FINISHED;
        this.needToSnapshotJobVertices.add(executionVertexId.getJobVertexId());
        HashMap<OperatorID, CompletableFuture<byte[]>> operatorCoordinatorSnapshotFutures = new HashMap<OperatorID, CompletableFuture<byte[]>>();
        CompletableFuture<ShuffleMasterSnapshot> shuffleMasterSnapshotFuture = null;
        long currentRelativeTime = this.clock.relativeTimeMillis();
        if (jobVertexFinished || currentRelativeTime - this.lastSnapshotRelativeTime >= this.snapshotMinPauseMills) {
            operatorCoordinatorSnapshotFutures.putAll(this.snapshotOperatorCoordinators());
            this.lastSnapshotRelativeTime = currentRelativeTime;
            this.needToSnapshotJobVertices.clear();
            shuffleMasterSnapshotFuture = this.snapshotShuffleMaster();
        }
        this.jobEventManager.writeEvent(new ExecutionVertexFinishedEvent(execution.getAttemptId(), execution.getAssignedResourceLocation(), operatorCoordinatorSnapshotFutures, shuffleMasterSnapshotFuture, execution.getIOMetrics(), execution.getUserAccumulators()), jobVertexFinished);
    }

    private Map<OperatorID, CompletableFuture<byte[]>> snapshotOperatorCoordinators() {
        HashMap<OperatorID, CompletableFuture<byte[]>> snapshotFutures = new HashMap<OperatorID, CompletableFuture<byte[]>>();
        for (JobVertexID jobVertexId : this.needToSnapshotJobVertices) {
            ExecutionJobVertex jobVertex = Preconditions.checkNotNull(this.getExecutionJobVertex(jobVertexId));
            this.log.info("Snapshot operator coordinators of {} to job event, checkpointId {}.", (Object)jobVertex.getName(), (Object)-1L);
            for (OperatorCoordinatorHolder holder : jobVertex.getOperatorCoordinators()) {
                if (!holder.coordinator().supportsBatchSnapshot()) continue;
                CompletableFuture<byte[]> checkpointFuture = new CompletableFuture<byte[]>();
                holder.checkpointCoordinator(-1L, checkpointFuture);
                snapshotFutures.put(holder.operatorId(), checkpointFuture);
            }
        }
        return snapshotFutures;
    }

    private CompletableFuture<ShuffleMasterSnapshot> snapshotShuffleMaster() {
        Preconditions.checkState(this.context.getShuffleMaster().supportsBatchSnapshot());
        CompletableFuture<ShuffleMasterSnapshot> shuffleMasterSnapshotFuture = new CompletableFuture<ShuffleMasterSnapshot>();
        this.context.getShuffleMaster().snapshotState(shuffleMasterSnapshotFuture, new DefaultShuffleMasterSnapshotContext(), this.jobId);
        return shuffleMasterSnapshotFuture;
    }

    private void determineVerticesForResetAfterRestoreOpCoordinator() throws Exception {
        HashSet<ExecutionVertexID> verticesToReset = new HashSet<ExecutionVertexID>();
        for (ExecutionJobVertex jobVertex : this.context.getExecutionGraph().getAllVertices().values()) {
            if (!jobVertex.isInitialized() || jobVertex.getOperatorCoordinators().isEmpty()) continue;
            boolean allSupportsBatchSnapshot = jobVertex.getOperatorCoordinators().stream().allMatch(holder -> holder.coordinator().supportsBatchSnapshot());
            Set unfinishedTasks = Arrays.stream(jobVertex.getTaskVertices()).filter(vertex -> vertex.getExecutionState() != ExecutionState.FINISHED).map(executionVertex -> {
                executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.CANCELED);
                return executionVertex.getID();
            }).collect(Collectors.toSet());
            if (allSupportsBatchSnapshot) {
                this.log.info("All operator coordinators of jobVertex {} support batch snapshot, add {} unfinished tasks to revise.", (Object)jobVertex.getName(), (Object)unfinishedTasks.size());
                verticesToReset.addAll(unfinishedTasks);
                continue;
            }
            if (unfinishedTasks.isEmpty()) {
                this.log.info("JobVertex {} is finished, but not all of its operator coordinators support batch snapshot. Therefore, if any single task within it requires a restart in the future, all tasks associated with this JobVertex need to be restarted as well.", (Object)jobVertex.getName());
                this.jobVerticesWithUnRecoveredCoordinators.add(jobVertex.getJobVertexId());
                continue;
            }
            this.log.info("Restart all tasks of jobVertex {} because it has not been finished and not all of its operator coordinators support batch snapshot.", (Object)jobVertex.getName());
            verticesToReset.addAll(Arrays.stream(jobVertex.getTaskVertices()).map(ExecutionVertex::getID).collect(Collectors.toSet()));
        }
        this.resetVerticesInRecovering(verticesToReset, false);
    }

    private void resetVerticesInRecovering(Set<ExecutionVertexID> nextVertices, boolean baseOnResultPartitionConsumable) throws Exception {
        Preconditions.checkState(this.isRecovering());
        HashSet<ExecutionVertexID> verticesToRestart = new HashSet<ExecutionVertexID>();
        while (!nextVertices.isEmpty()) {
            for (ExecutionVertexID executionVertexId : nextVertices) {
                if (verticesToRestart.contains(executionVertexId)) continue;
                verticesToRestart.addAll(this.context.getTasksNeedingRestart(executionVertexId, baseOnResultPartitionConsumable));
            }
            Set extraNeedToRestartJobVertices = verticesToRestart.stream().map(ExecutionVertexID::getJobVertexId).filter(this.jobVerticesWithUnRecoveredCoordinators::contains).collect(Collectors.toSet());
            this.jobVerticesWithUnRecoveredCoordinators.removeAll(extraNeedToRestartJobVertices);
            nextVertices = extraNeedToRestartJobVertices.stream().flatMap(jobVertexId -> {
                ExecutionJobVertex jobVertex = this.getExecutionJobVertex((JobVertexID)jobVertexId);
                return Arrays.stream(jobVertex.getTaskVertices()).map(ExecutionVertex::getID);
            }).collect(Collectors.toSet());
        }
        Set<ExecutionVertexID> verticesToReset = verticesToRestart.stream().filter(executionVertexID -> this.getExecutionVertex((ExecutionVertexID)executionVertexID).getExecutionState() != ExecutionState.CREATED).collect(Collectors.toSet());
        this.context.resetVerticesInRecovering(verticesToReset);
    }

    private void recoverFailed() {
        String message = String.format("Job %s recover failed from JM failover, fail global.", this.context.getExecutionGraph().getJobID());
        this.log.warn(message);
        this.context.getExecutionGraph().transitionState(JobStatus.RECONCILING, JobStatus.RUNNING);
        this.jobEventManager.stop(true);
        try {
            this.jobEventManager.start();
        }
        catch (Throwable throwable) {
            this.context.failJob(throwable, System.currentTimeMillis(), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
            return;
        }
        this.context.onRecoveringFailed();
    }

    private void recoverFinished() {
        this.log.info("Job {} successfully recovered from JM failover", (Object)this.context.getExecutionGraph().getJobID());
        this.context.getExecutionGraph().transitionState(JobStatus.RECONCILING, JobStatus.RUNNING);
        this.checkExecutionGraphState();
        this.context.onRecoveringFinished(this.jobVerticesWithUnRecoveredCoordinators);
    }

    private void checkExecutionGraphState() {
        for (ExecutionVertex executionVertex : this.context.getExecutionGraph().getAllExecutionVertices()) {
            ExecutionState state = executionVertex.getExecutionState();
            Preconditions.checkState(state == ExecutionState.CREATED || state == ExecutionState.FINISHED);
        }
    }

    private CompletableFuture<Void> recoverPartitions() {
        this.context.getMainThreadExecutor().assertRunningInMainThread();
        CompletableFuture<Tuple2<ReconcileResult, Collection<PartitionWithMetrics>>> reconcilePartitionsFuture = this.reconcilePartitions();
        return reconcilePartitionsFuture.thenAccept(tuple2 -> {
            ReconcileResult reconcileResult = (ReconcileResult)tuple2.f0;
            Collection partitionWithMetrics = (Collection)tuple2.f1;
            this.log.info("Partitions to be released: {}, missed partitions: {}, partitions to be reserved: {}.", new Object[]{reconcileResult.partitionsToRelease, reconcileResult.partitionsMissing, reconcileResult.partitionsToReserve});
            ((InternalExecutionGraphAccessor)((Object)this.context.getExecutionGraph())).getPartitionTracker().stopTrackingAndReleasePartitions(reconcileResult.partitionsToRelease);
            HashMap<IntermediateResultPartitionID, ResultPartitionBytes> availablePartitionBytes = new HashMap<IntermediateResultPartitionID, ResultPartitionBytes>();
            partitionWithMetrics.stream().filter(partitionAndMetric -> reconcileResult.partitionsToReserve.contains(partitionAndMetric.getPartition().getResultPartitionID())).forEach(partitionAndMetric -> {
                ShuffleDescriptor shuffleDescriptor = partitionAndMetric.getPartition();
                ResourceID producerTaskExecutorId = UNKNOWN_PRODUCER;
                if (shuffleDescriptor.storesLocalResourcesOn().isPresent()) {
                    producerTaskExecutorId = shuffleDescriptor.storesLocalResourcesOn().get();
                }
                IntermediateResultPartition partition = this.context.getExecutionGraph().getResultPartitionOrThrow(shuffleDescriptor.getResultPartitionID().getPartitionId());
                ((InternalExecutionGraphAccessor)((Object)this.context.getExecutionGraph())).getPartitionTracker().startTrackingPartition(producerTaskExecutorId, Execution.createResultPartitionDeploymentDescriptor(partition, shuffleDescriptor));
                availablePartitionBytes.put(shuffleDescriptor.getResultPartitionID().getPartitionId(), partitionAndMetric.getPartitionMetrics().getPartitionBytes());
            });
            HashMap<ExecutionVertexID, Map> allDescriptors = new HashMap<ExecutionVertexID, Map>();
            ((InternalExecutionGraphAccessor)((Object)this.context.getExecutionGraph())).getPartitionTracker().getAllTrackedNonClusterPartitions().forEach(descriptor -> {
                ExecutionVertexID vertexId = descriptor.getShuffleDescriptor().getResultPartitionID().getProducerId().getExecutionVertexId();
                if (!allDescriptors.containsKey(vertexId)) {
                    allDescriptors.put(vertexId, new HashMap());
                }
                ((Map)allDescriptors.get(vertexId)).put(descriptor.getPartitionId(), descriptor);
            });
            allDescriptors.forEach((vertexId, descriptors) -> this.getExecutionVertex((ExecutionVertexID)vertexId).getCurrentExecutionAttempt().recoverProducedPartitions((Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor>)descriptors));
            this.context.updateResultPartitionBytesMetrics(availablePartitionBytes);
            Set<ExecutionVertexID> missingPartitionVertices = reconcileResult.partitionsMissing.stream().map(ResultPartitionID::getPartitionId).map(this::getProducer).map(ExecutionVertex::getID).collect(Collectors.toSet());
            try {
                this.resetVerticesInRecovering(missingPartitionVertices, true);
            }
            catch (Exception e) {
                throw new CompletionException(e);
            }
        });
    }

    private CompletableFuture<Tuple2<ReconcileResult, Collection<PartitionWithMetrics>>> reconcilePartitions() {
        List partitions = this.context.getExecutionGraph().getAllIntermediateResults().values().stream().flatMap(result -> Arrays.stream(result.getPartitions())).collect(Collectors.toList());
        HashSet<ResultPartitionID> partitionsToReserve = new HashSet<ResultPartitionID>();
        HashSet<ResultPartitionID> partitionsToRelease = new HashSet<ResultPartitionID>();
        for (IntermediateResultPartition partition : partitions) {
            PartitionReservationStatus reserveStatus = this.getPartitionReservationStatus(partition);
            if (reserveStatus.equals((Object)PartitionReservationStatus.RESERVE)) {
                partitionsToReserve.add(this.createResultPartitionId(partition.getPartitionId()));
                continue;
            }
            if (!reserveStatus.equals((Object)PartitionReservationStatus.RELEASE)) continue;
            partitionsToRelease.add(this.createResultPartitionId(partition.getPartitionId()));
        }
        CompletableFuture<Collection<PartitionWithMetrics>> fetchPartitionsFuture = this.context.getShuffleMaster().getPartitionWithMetrics(this.context.getExecutionGraph().getJobID(), this.previousWorkerRecoveryTimeout, partitionsToReserve);
        return fetchPartitionsFuture.thenApplyAsync(partitionWithMetrics -> {
            Set actualPartitions = partitionWithMetrics.stream().map(PartitionWithMetrics::getPartition).map(ShuffleDescriptor::getResultPartitionID).collect(Collectors.toSet());
            Sets.SetView<ResultPartitionID> actualpartitionsToRelease = Sets.intersection(partitionsToRelease, actualPartitions);
            Sets.SetView<ResultPartitionID> actualpartitionsMissing = Sets.difference(partitionsToReserve, actualPartitions);
            Sets.SetView<ResultPartitionID> actualpartitionsToReserve = Sets.intersection(partitionsToReserve, actualPartitions);
            return Tuple2.of(new ReconcileResult(actualpartitionsToRelease, actualpartitionsMissing, actualpartitionsToReserve), partitionWithMetrics);
        }, (Executor)this.context.getMainThreadExecutor());
    }

    private ResultPartitionID createResultPartitionId(IntermediateResultPartitionID partitionId) {
        Execution producer = this.getProducer(partitionId).getPartitionProducer();
        return new ResultPartitionID(partitionId, producer.getAttemptId());
    }

    private ExecutionVertex getProducer(IntermediateResultPartitionID partitionId) {
        return this.context.getExecutionGraph().getResultPartitionOrThrow(partitionId).getProducer();
    }

    private PartitionReservationStatus getPartitionReservationStatus(IntermediateResultPartition partition) {
        boolean isProducerFinished;
        ExecutionVertex producer = this.getProducer(partition.getPartitionId());
        boolean bl = isProducerFinished = producer.getExecutionState() == ExecutionState.FINISHED;
        if (!isProducerFinished) {
            return PartitionReservationStatus.RELEASE;
        }
        boolean allConsumersInitialized = partition.getIntermediateResult().getConsumerVertices().stream().allMatch(jobVertexId -> this.getExecutionJobVertex((JobVertexID)jobVertexId).isInitialized());
        if (!allConsumersInitialized) {
            return PartitionReservationStatus.RESERVE;
        }
        return this.getConsumers(partition.getPartitionId()).stream().anyMatch(vertex -> vertex.getExecutionState() != ExecutionState.FINISHED) ? PartitionReservationStatus.RESERVE : PartitionReservationStatus.OPTIONAL;
    }

    private List<ExecutionVertex> getConsumers(IntermediateResultPartitionID partitionId) {
        List<ConsumerVertexGroup> consumerVertexGroups = this.context.getExecutionGraph().getResultPartitionOrThrow(partitionId).getConsumerVertexGroups();
        ArrayList<ExecutionVertex> executionVertices = new ArrayList<ExecutionVertex>();
        for (ConsumerVertexGroup group : consumerVertexGroups) {
            for (ExecutionVertexID executionVertexID : group) {
                executionVertices.add(this.getExecutionVertex(executionVertexID));
            }
        }
        return executionVertices;
    }

    private ExecutionVertex getExecutionVertex(ExecutionVertexID executionVertexId) {
        return this.context.getExecutionGraph().getAllVertices().get(executionVertexId.getJobVertexId()).getTaskVertices()[executionVertexId.getSubtaskIndex()];
    }

    private ExecutionJobVertex getExecutionJobVertex(JobVertexID jobVertexId) {
        return this.context.getExecutionGraph().getAllVertices().get(jobVertexId);
    }

    private static class ReconcileResult {
        private final Set<ResultPartitionID> partitionsToRelease;
        private final Set<ResultPartitionID> partitionsMissing;
        private final Set<ResultPartitionID> partitionsToReserve;

        ReconcileResult(Set<ResultPartitionID> partitionsToRelease, Set<ResultPartitionID> partitionsMissing, Set<ResultPartitionID> partitionsToReserve) {
            this.partitionsToRelease = Preconditions.checkNotNull(partitionsToRelease);
            this.partitionsMissing = Preconditions.checkNotNull(partitionsMissing);
            this.partitionsToReserve = Preconditions.checkNotNull(partitionsToReserve);
        }
    }

    private static enum PartitionReservationStatus {
        RELEASE,
        RESERVE,
        OPTIONAL;

    }
}

