/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.LocalMemberResetException;
import com.hazelcast.core.Member;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.ClusterServiceImpl;
import com.hazelcast.internal.cluster.impl.MembersView;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.impl.ExplodeSnapshotP;
import com.hazelcast.jet.impl.JobCoordinationService;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.SnapshotRepository;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.JobTerminateRequestedException;
import com.hazelcast.jet.impl.exception.ShutdownInProgressException;
import com.hazelcast.jet.impl.exception.TerminatedWithSnapshotException;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.execution.init.ExecutionPlanBuilder;
import com.hazelcast.jet.impl.operation.CompleteExecutionOperation;
import com.hazelcast.jet.impl.operation.InitExecutionOperation;
import com.hazelcast.jet.impl.operation.SnapshotOperation;
import com.hazelcast.jet.impl.operation.StartExecutionOperation;
import com.hazelcast.jet.impl.operation.TerminateExecutionOperation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class MasterContext {
    public static final int SNAPSHOT_RESTORE_EDGE_PRIORITY = Integer.MIN_VALUE;
    public static final String SNAPSHOT_VERTEX_PREFIX = "__snapshot_";
    private static final Object NULL_OBJECT = new Object(){

        public String toString() {
            return "NULL_OBJECT";
        }
    };
    private final Object lock = new Object();
    private final NodeEngineImpl nodeEngine;
    private final JobCoordinationService coordinationService;
    private final ILogger logger;
    private final long jobId;
    private final String jobName;
    private final SnapshotRepository snapshotRepository;
    private volatile JobRecord jobRecord;
    private volatile JobStatus jobStatus = JobStatus.NOT_RUNNING;
    private volatile Set<Vertex> vertices;
    private volatile long executionId;
    private volatile long executionStartTime;
    private volatile Map<MemberInfo, ExecutionPlan> executionPlanMap;
    private volatile ExecutionInvocationCallback executionInvocationCallback;
    private final NonCompletableFuture completionFuture = new NonCompletableFuture();
    private volatile TerminationMode requestedTerminationMode;
    private boolean snapshotInProgress;
    private volatile boolean nextSnapshotIsTerminal;
    private CompletableFuture<Void> terminalSnapshotFuture;

    MasterContext(NodeEngineImpl nodeEngine, JobCoordinationService coordinationService, JobRecord jobRecord) {
        this.nodeEngine = nodeEngine;
        this.coordinationService = coordinationService;
        this.snapshotRepository = coordinationService.snapshotRepository();
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobRecord = jobRecord;
        this.jobId = jobRecord.getJobId();
        this.jobName = jobRecord.getJobNameOrId();
        if (jobRecord.isSuspended()) {
            this.jobStatus = JobStatus.SUSPENDED;
        }
    }

    public long jobId() {
        return this.jobId;
    }

    public long executionId() {
        return this.executionId;
    }

    public JobStatus jobStatus() {
        return this.jobStatus;
    }

    public JobConfig jobConfig() {
        return this.jobRecord.getConfig();
    }

    public JobRecord jobRecord() {
        return this.jobRecord;
    }

    public CompletableFuture<Void> completionFuture() {
        return this.completionFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean requestTermination(TerminationMode mode) {
        JobStatus localStatus;
        this.assertLockNotHeld();
        Object object = this.lock;
        synchronized (object) {
            if (!this.isSnapshottingEnabled()) {
                mode = mode.withoutTerminalSnapshot();
            }
            if ((localStatus = this.jobStatus()) == JobStatus.SUSPENDED && mode != TerminationMode.CANCEL) {
                return false;
            }
            if (this.requestedTerminationMode != null) {
                return false;
            }
            this.requestedTerminationMode = mode;
            if (localStatus == JobStatus.SUSPENDED) {
                this.jobStatus = JobStatus.COMPLETED;
                this.setFinalResult(new CancellationException());
            }
        }
        if (localStatus == JobStatus.SUSPENDED) {
            this.coordinationService.completeJob(this, System.currentTimeMillis(), new CancellationException());
        } else if (localStatus == JobStatus.RUNNING || localStatus == JobStatus.STARTING) {
            this.handleTermination(mode);
        }
        return true;
    }

    boolean isCancelled() {
        return this.requestedTerminationMode == TerminationMode.CANCEL;
    }

    TerminationMode requestedTerminationMode() {
        return this.requestedTerminationMode;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void tryStartJob(Function<Long, Long> executionIdSupplier) {
        String dotString;
        Exception exception;
        DAG dag;
        ClassLoader classLoader;
        block21: {
            classLoader = null;
            dag = null;
            exception = null;
            dotString = null;
            this.assertLockNotHeld();
            Object object = this.lock;
            synchronized (object) {
                if (this.isCancelled()) {
                    this.logger.fine("Skipping init job '" + this.jobName + "': is already cancelled.");
                    exception = new CancellationException();
                    break block21;
                }
                if (!this.setJobStatusToStarting() || this.scheduleRestartIfQuorumAbsent() || this.scheduleRestartIfClusterIsNotSafe()) {
                    return;
                }
                if (this.requestedTerminationMode != null) {
                    if (this.requestedTerminationMode.actionAfterTerminate() != TerminationMode.ActionAfterTerminate.RESTART) {
                        exception = new JobTerminateRequestedException(this.requestedTerminationMode);
                        break block21;
                    }
                    this.requestedTerminationMode = null;
                }
                classLoader = this.coordinationService.getJetService().getClassLoader(this.jobId);
                try {
                    dag = (DAG)CustomClassLoadedObject.deserializeWithCustomClassLoader(this.nodeEngine.getSerializationService(), classLoader, this.jobRecord.getDag());
                }
                catch (Exception e) {
                    this.logger.warning("DAG deserialization failed", e);
                    exception = e;
                    break block21;
                }
                this.vertices = new HashSet<Vertex>();
                dotString = dag.toDotString();
                dag.iterator().forEachRemaining(this.vertices::add);
                this.executionId = executionIdSupplier.apply(this.jobId);
                this.snapshotInProgress = false;
                this.nextSnapshotIsTerminal = false;
                this.terminalSnapshotFuture = new CompletableFuture();
            }
        }
        if (exception != null) {
            this.finalizeJob(exception);
            return;
        }
        long lastSnapshotId = -1L;
        if (this.isSnapshottingEnabled()) {
            Long snapshotIdToRestore = this.snapshotRepository.latestCompleteSnapshot(this.jobId);
            try {
                this.snapshotRepository.deleteAllSnapshotsExceptOne(this.jobId, snapshotIdToRestore);
            }
            catch (Exception e) {
                this.logger.warning("Cannot delete old snapshots for " + this.jobName, e);
            }
            Long lastStartedSnapshot = this.snapshotRepository.latestStartedSnapshot(this.jobId);
            if (snapshotIdToRestore != null) {
                this.logger.info("State of " + this.jobIdString() + " will be restored from snapshot " + snapshotIdToRestore);
                this.rewriteDagWithSnapshotRestore(dag, snapshotIdToRestore);
            } else {
                this.logger.info("No previous snapshot for " + this.jobIdString() + " found.");
            }
            if (lastStartedSnapshot != null) {
                lastSnapshotId = lastStartedSnapshot;
            }
        }
        MembersView membersView = this.getMembersView();
        ClassLoader previousCL = MasterContext.swapContextClassLoader(classLoader);
        try {
            this.logger.info("Start executing " + this.jobIdString() + ", status " + (Object)((Object)this.jobStatus()) + ", execution graph in DOT format:\n" + dotString + "\nHINT: You can use graphviz or http://viz-js.com to visualize the printed graph.");
            this.logger.fine("Building execution plan for " + this.jobIdString());
            this.executionPlanMap = ExecutionPlanBuilder.createExecutionPlans(this.nodeEngine, membersView, dag, this.jobId, this.executionId, this.jobConfig(), lastSnapshotId);
        }
        catch (Exception e) {
            this.logger.severe("Exception creating execution plan for " + this.jobIdString(), e);
            this.finalizeJob(e);
            return;
        }
        finally {
            Thread.currentThread().setContextClassLoader(previousCL);
        }
        this.logger.fine("Built execution plans for " + this.jobIdString());
        Set<MemberInfo> participants = this.executionPlanMap.keySet();
        Function<ExecutionPlan, Operation> operationCtor = plan -> new InitExecutionOperation(this.jobId, this.executionId, membersView.getVersion(), participants, (Data)this.nodeEngine.getSerializationService().toData(plan));
        this.invokeOnParticipants(operationCtor, this::onInitStepCompleted, null);
    }

    private void rewriteDagWithSnapshotRestore(DAG dag, long snapshotId) {
        this.logger.info(this.jobIdString() + ": restoring state from snapshotId=" + snapshotId);
        for (Vertex vertex : dag) {
            String mapName = SnapshotRepository.snapshotDataMapName(this.jobId, snapshotId, vertex.getName());
            Vertex readSnapshotVertex = dag.newVertex("__snapshot_read." + vertex.getName(), SourceProcessors.readMapP(mapName));
            Vertex explodeVertex = dag.newVertex("__snapshot_explode." + vertex.getName(), ExplodeSnapshotP::new);
            readSnapshotVertex.localParallelism(vertex.getLocalParallelism());
            explodeVertex.localParallelism(vertex.getLocalParallelism());
            int destOrdinal = dag.getInboundEdges(vertex.getName()).size();
            dag.edge(Edge.between(readSnapshotVertex, explodeVertex).isolated()).edge(new SnapshotRestoreEdge(explodeVertex, vertex, destOrdinal));
        }
    }

    private boolean setJobStatusToStarting() {
        this.assertLockHeld();
        JobStatus status = this.jobStatus();
        if (status != JobStatus.NOT_RUNNING) {
            this.logger.fine("Not starting job '" + this.jobName + "': status is " + (Object)((Object)status));
            return false;
        }
        assert (this.jobStatus == JobStatus.NOT_RUNNING) : "cannot start job " + com.hazelcast.jet.Util.idToString(this.jobId) + " with status: " + (Object)((Object)this.jobStatus);
        this.jobStatus = JobStatus.STARTING;
        this.executionStartTime = System.nanoTime();
        this.jobRecord = this.jobRecord.withSuspended(false);
        return true;
    }

    private boolean scheduleRestartIfQuorumAbsent() {
        int quorumSize = this.jobRecord.getQuorumSize();
        if (this.coordinationService.isQuorumPresent(quorumSize)) {
            return false;
        }
        this.logger.fine("Rescheduling restart of job '" + this.jobName + "': quorum size " + quorumSize + " is not met");
        this.scheduleRestart();
        return true;
    }

    private boolean scheduleRestartIfClusterIsNotSafe() {
        if (this.coordinationService.shouldStartJobs()) {
            return false;
        }
        this.logger.fine("Rescheduling restart of job '" + this.jobName + "': cluster is not safe");
        this.scheduleRestart();
        return true;
    }

    private void scheduleRestart() {
        this.assertLockHeld();
        if (this.jobStatus != JobStatus.NOT_RUNNING && this.jobStatus != JobStatus.STARTING && this.jobStatus != JobStatus.RUNNING) {
            throw new IllegalStateException("Restart scheduled in an unexpected state: " + (Object)((Object)this.jobStatus));
        }
        this.jobStatus = JobStatus.NOT_RUNNING;
        this.coordinationService.scheduleRestart(this.jobId);
    }

    private MembersView getMembersView() {
        ClusterServiceImpl clusterService = (ClusterServiceImpl)this.nodeEngine.getClusterService();
        return clusterService.getMembershipManager().getMembersView();
    }

    private void onInitStepCompleted(Map<MemberInfo, Object> responses) {
        JobStatus status;
        Throwable error = this.getResult("Init", responses);
        if (error == null && (status = this.jobStatus()) != JobStatus.STARTING) {
            error = new IllegalStateException("Cannot execute " + this.jobIdString() + ": status is " + (Object)((Object)status));
        }
        if (error == null) {
            this.invokeStartExecution();
        } else {
            this.invokeCompleteExecution(error);
        }
    }

    private Map<Boolean, List<Map.Entry<MemberInfo, Object>>> groupResponses(Map<MemberInfo, Object> responses) {
        Map<Boolean, List<Map.Entry<MemberInfo, Object>>> grouped = responses.entrySet().stream().collect(Collectors.partitioningBy(e -> e.getValue() instanceof Throwable));
        grouped.putIfAbsent(true, Collections.emptyList());
        grouped.putIfAbsent(false, Collections.emptyList());
        return grouped;
    }

    private void invokeStartExecution() {
        this.logger.fine("Executing " + this.jobIdString());
        long executionId = this.executionId;
        this.executionInvocationCallback = new ExecutionInvocationCallback(executionId);
        if (this.requestedTerminationMode != null) {
            this.handleTermination(this.requestedTerminationMode);
        }
        Function<ExecutionPlan, Operation> operationCtor = plan -> new StartExecutionOperation(this.jobId, executionId);
        Consumer<Map<MemberInfo, Object>> completionCallback = this::onExecuteStepCompleted;
        this.jobStatus = JobStatus.RUNNING;
        this.invokeOnParticipants(operationCtor, completionCallback, this.executionInvocationCallback);
        if (this.isSnapshottingEnabled()) {
            this.coordinationService.scheduleSnapshot(this.jobId, executionId);
        }
    }

    private void handleTermination(@Nonnull TerminationMode mode) {
        if (mode.isWithTerminalSnapshot()) {
            this.nextSnapshotIsTerminal = true;
            this.beginSnapshot(this.executionId);
        } else if (this.executionInvocationCallback != null) {
            this.executionInvocationCallback.cancelInvocations(mode);
        }
    }

    private void cancelExecutionInvocations(long jobId, long executionId, TerminationMode mode) {
        this.nodeEngine.getExecutionService().execute("hz:async", () -> this.invokeOnParticipants(plan -> new TerminateExecutionOperation(jobId, executionId, mode), null, null));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void beginSnapshot(long executionId) {
        boolean isTerminal;
        this.assertLockNotHeld();
        Object object = this.lock;
        synchronized (object) {
            if (this.executionId != executionId) {
                this.logger.fine("Not beginning snapshot since unexpected execution ID received for " + this.jobIdString() + ". Received execution ID: " + com.hazelcast.jet.Util.idToString(executionId));
                return;
            }
            if (this.jobStatus != JobStatus.RUNNING) {
                this.logger.fine("Not beginning snapshot, job is not RUNNING, but " + (Object)((Object)this.jobStatus));
                return;
            }
            if (this.snapshotInProgress) {
                this.logger.fine("Not beginning snapshot since one is already in progress " + this.jobIdString());
                return;
            }
            this.snapshotInProgress = true;
            isTerminal = this.nextSnapshotIsTerminal;
        }
        List<String> vertexNames = this.vertices.stream().map(Vertex::getName).collect(Collectors.toList());
        long newSnapshotId = this.snapshotRepository.registerSnapshot(this.jobId, vertexNames);
        this.logger.info(String.format("Starting%s snapshot %s for %s", isTerminal ? " terminal" : "", newSnapshotId, this.jobIdString()));
        Function<ExecutionPlan, Operation> factory = plan -> new SnapshotOperation(this.jobId, executionId, newSnapshotId, isTerminal);
        this.invokeOnParticipants(factory, responses -> this.onSnapshotCompleted((Map<MemberInfo, Object>)responses, executionId, newSnapshotId, isTerminal), null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onSnapshotCompleted(Map<MemberInfo, Object> responses, long executionId, long snapshotId, boolean wasTerminal) {
        boolean isSuccess;
        SnapshotOperation.SnapshotOperationResult mergedResult = new SnapshotOperation.SnapshotOperationResult();
        for (Object response : responses.values()) {
            if (response instanceof Throwable) {
                response = new SnapshotOperation.SnapshotOperationResult(0L, 0L, 0L, (Throwable)response);
            }
            mergedResult.merge((SnapshotOperation.SnapshotOperationResult)response);
        }
        boolean bl = isSuccess = mergedResult.getError() == null;
        if (!isSuccess) {
            this.logger.warning(this.jobIdString() + " snapshot " + snapshotId + " failed on some member(s), one of the failures: " + mergedResult.getError());
        }
        this.coordinationService.completeSnapshot(this.jobId, snapshotId, isSuccess, mergedResult.getNumBytes(), mergedResult.getNumKeys(), mergedResult.getNumChunks());
        Runnable nonSynchronizedAction = () -> {};
        Object object = this.lock;
        synchronized (object) {
            if (this.executionId != executionId) {
                this.logger.fine("Not completing terminalSnapshotFuture on " + this.jobIdString() + ", new execution already started, snapshot was for executionId=" + com.hazelcast.jet.Util.idToString(executionId));
                return;
            }
            assert (this.snapshotInProgress) : "snapshot not in progress";
            this.snapshotInProgress = false;
            if (wasTerminal) {
                boolean completedNow = this.terminalSnapshotFuture.complete(null);
                assert (completedNow) : "terminalSnapshotFuture was already completed";
            } else if (this.nextSnapshotIsTerminal) {
                nonSynchronizedAction = () -> this.coordinationService.beginSnapshot(this.jobId, executionId);
            } else {
                this.coordinationService.scheduleSnapshot(this.jobId, executionId);
            }
        }
        nonSynchronizedAction.run();
    }

    private void onExecuteStepCompleted(Map<MemberInfo, Object> responses) {
        this.invokeCompleteExecution(this.getResult("Execution", responses));
    }

    private Throwable getResult(String opName, Map<MemberInfo, Object> responses) {
        if (this.isCancelled()) {
            this.logger.fine(this.jobIdString() + " to be cancelled after " + opName);
            return new CancellationException();
        }
        Map<Boolean, List<Map.Entry<MemberInfo, Object>>> grouped = this.groupResponses(responses);
        Collection successfulMembers = grouped.get(false).stream().map(Map.Entry::getKey).collect(Collectors.toList());
        List<Map.Entry<MemberInfo, Object>> failures = grouped.get(true);
        if (!failures.isEmpty()) {
            this.logger.fine(opName + " of " + this.jobIdString() + " has failures: " + failures);
        }
        if (successfulMembers.size() == this.executionPlanMap.size()) {
            this.logger.fine(opName + " of " + this.jobIdString() + " was successful");
            return null;
        }
        if (failures.stream().allMatch(e -> e.getValue() instanceof TerminatedWithSnapshotException)) {
            assert (opName.equals("Execution")) : "opName=" + opName;
            this.logger.fine(opName + " of " + this.jobIdString() + " terminated after a terminal snapshot");
            TerminationMode mode = this.requestedTerminationMode;
            assert (mode != null && mode.isWithTerminalSnapshot()) : "mode=" + (Object)((Object)mode);
            return new JobTerminateRequestedException(mode);
        }
        return failures.stream().peek(e -> {
            if (e.getValue() instanceof ShutdownInProgressException) {
                this.coordinationService.addShuttingDownMember(((MemberInfo)e.getKey()).getUuid());
            }
        }).map(e -> (Throwable)e.getValue()).filter(t -> !(t instanceof CancellationException) && !(t instanceof TerminatedWithSnapshotException)).filter(t -> !ExceptionUtil.isRestartableException(t)).findFirst().map(ExceptionUtil::peel).orElseGet(TopologyChangedException::new);
    }

    private void invokeCompleteExecution(Throwable error) {
        Throwable finalError;
        JobStatus status = this.jobStatus();
        if (status == JobStatus.STARTING || status == JobStatus.RUNNING) {
            this.logger.fine("Completing " + this.jobIdString());
            finalError = error;
        } else {
            if (error != null) {
                this.logger.severe("Cannot properly complete failed " + this.jobIdString() + ": status is " + (Object)((Object)status), error);
            } else {
                this.logger.severe("Cannot properly complete " + this.jobIdString() + ": status is " + (Object)((Object)status));
            }
            finalError = new IllegalStateException("Job coordination failed.");
        }
        Function<ExecutionPlan, Operation> operationCtor = plan -> new CompleteExecutionOperation(this.executionId, finalError);
        this.invokeOnParticipants(operationCtor, responses -> this.onCompleteExecutionCompleted(error), null);
    }

    private void onCompleteExecutionCompleted(Throwable error) {
        if (error instanceof JobTerminateRequestedException && ((JobTerminateRequestedException)error).mode().isWithTerminalSnapshot()) {
            this.terminalSnapshotFuture.whenCompleteAsync((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> this.finalizeJob(error)));
        } else {
            this.finalizeJob(error);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void finalizeJob(@Nullable Throwable failure) {
        Runnable nonSynchronizedAction = () -> {};
        this.assertLockNotHeld();
        Object object = this.lock;
        synchronized (object) {
            TerminationMode.ActionAfterTerminate terminationModeAction;
            boolean isSuccess;
            if (!this.checkJobNotDone(failure)) {
                return;
            }
            this.completeVertices(failure);
            long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.executionStartTime);
            boolean bl = isSuccess = failure == null || failure instanceof CancellationException || failure instanceof JobTerminateRequestedException;
            if (isSuccess) {
                if (failure != null) {
                    this.logger.info(String.format("Execution of %s completed in %,d ms, reason=%s", this.jobIdString(), elapsed, failure));
                } else {
                    this.logger.info(String.format("Execution of %s completed in %,d ms", this.jobIdString(), elapsed));
                }
            } else {
                this.logger.warning(String.format("Execution of %s failed after %,d ms", this.jobIdString(), elapsed), failure);
            }
            this.requestedTerminationMode = null;
            this.executionInvocationCallback = null;
            TerminationMode.ActionAfterTerminate actionAfterTerminate = terminationModeAction = failure instanceof JobTerminateRequestedException ? ((JobTerminateRequestedException)failure).mode().actionAfterTerminate() : null;
            if (terminationModeAction == TerminationMode.ActionAfterTerminate.RESTART) {
                this.jobStatus = JobStatus.NOT_RUNNING;
                nonSynchronizedAction = () -> this.coordinationService.restartJob(this.jobId);
            } else if ((failure instanceof RestartableException || failure instanceof TopologyChangedException) && this.jobRecord.getConfig().isAutoScaling()) {
                this.scheduleRestart();
            } else if (terminationModeAction == TerminationMode.ActionAfterTerminate.SUSPEND || (failure instanceof RestartableException || failure instanceof TopologyChangedException) && !this.jobRecord.getConfig().isAutoScaling() && this.jobRecord.getConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE) {
                this.jobStatus = JobStatus.SUSPENDED;
                this.jobRecord = this.jobRecord.withSuspended(true);
                nonSynchronizedAction = () -> this.coordinationService.suspendJob(this);
            } else {
                JobStatus jobStatus = this.jobStatus = isSuccess ? JobStatus.COMPLETED : JobStatus.FAILED;
                if (failure instanceof LocalMemberResetException) {
                    this.logger.fine("Cancelling job " + this.jobIdString() + " locally: member (local or remote) reset. We don't delete job metadata: job will restart on majority cluster");
                    this.setFinalResult(new CancellationException());
                    return;
                }
                nonSynchronizedAction = () -> {
                    try {
                        this.coordinationService.completeJob(this, System.currentTimeMillis(), failure);
                    }
                    catch (RuntimeException e) {
                        this.logger.warning("Completion of " + this.jobIdString() + " failed", e);
                    }
                    finally {
                        this.setFinalResult(failure);
                    }
                };
            }
        }
        nonSynchronizedAction.run();
    }

    private boolean checkJobNotDone(@Nullable Throwable failure) {
        JobStatus status = this.jobStatus();
        if (status == JobStatus.COMPLETED || status == JobStatus.FAILED) {
            if (failure != null) {
                this.logger.severe("Ignoring failure completion of " + com.hazelcast.jet.Util.idToString(this.jobId) + " because status is " + (Object)((Object)status), failure);
            } else {
                this.logger.severe("Ignoring completion of " + com.hazelcast.jet.Util.idToString(this.jobId) + " because status is " + (Object)((Object)status));
            }
            return false;
        }
        return true;
    }

    private void completeVertices(@Nullable Throwable failure) {
        if (this.vertices != null) {
            for (Vertex vertex : this.vertices) {
                try {
                    vertex.getMetaSupplier().close(failure);
                }
                catch (Exception e) {
                    this.logger.severe(this.jobIdString() + " encountered an exception in ProcessorMetaSupplier.complete(), ignoring it", e);
                }
            }
        }
    }

    void setFinalResult(Throwable failure) {
        if (failure == null) {
            this.completionFuture.internalComplete();
        } else {
            this.completionFuture.internalCompleteExceptionally(failure);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void updateQuorumSize(int newQuorumSize) {
        Object object = this.lock;
        synchronized (object) {
            this.jobRecord = this.jobRecord.withQuorumSize(newQuorumSize);
        }
    }

    private void invokeOnParticipants(Function<ExecutionPlan, Operation> operationCtor, @Nullable Consumer<Map<MemberInfo, Object>> completionCallback, @Nullable ExecutionCallback<Object> callback) {
        ConcurrentHashMap responses = new ConcurrentHashMap();
        AtomicInteger remainingCount = new AtomicInteger(this.executionPlanMap.size());
        for (Map.Entry<MemberInfo, ExecutionPlan> entry : this.executionPlanMap.entrySet()) {
            MemberInfo member = entry.getKey();
            Operation op = operationCtor.apply(entry.getValue());
            InternalCompletableFuture future = this.nodeEngine.getOperationService().createInvocationBuilder("hz:impl:jetService", op, member.getAddress()).invoke();
            if (completionCallback != null) {
                future.andThen(Util.callbackOf((r, throwable) -> {
                    Object response = r != null ? r : (throwable != null ? ExceptionUtil.peel(throwable) : NULL_OBJECT);
                    Object oldResponse = responses.put(member, response);
                    assert (oldResponse == null) : "Duplicate response for " + member + ". Old=" + oldResponse + ", new=" + response;
                    if (remainingCount.decrementAndGet() == 0) {
                        completionCallback.accept(responses);
                    }
                }));
            }
            if (callback == null) continue;
            future.andThen(callback);
        }
    }

    private boolean isSnapshottingEnabled() {
        return this.jobConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE;
    }

    String jobIdString() {
        return Util.jobNameAndExecutionId(this.jobName, this.executionId);
    }

    private static ClassLoader swapContextClassLoader(ClassLoader jobClassLoader) {
        Thread currentThread = Thread.currentThread();
        ClassLoader previous = currentThread.getContextClassLoader();
        currentThread.setContextClassLoader(jobClassLoader);
        return previous;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void resumeJob(Function<Long, Long> executionIdSupplier) {
        Object object = this.lock;
        synchronized (object) {
            if (this.jobStatus != JobStatus.SUSPENDED) {
                this.logger.info("Not resuming " + this.jobIdString() + ": not " + (Object)((Object)JobStatus.SUSPENDED) + ", but " + (Object)((Object)this.jobStatus));
                return;
            }
            this.jobStatus = JobStatus.NOT_RUNNING;
        }
        this.logger.fine("Resuming job " + this.jobName);
        this.tryStartJob(executionIdSupplier);
    }

    private boolean hasParticipant(String uuid) {
        return this.executionPlanMap != null && this.executionPlanMap.keySet().stream().anyMatch(mi -> mi.getUuid().equals(uuid));
    }

    @Nullable
    CompletableFuture<Void> onParticipantGracefulShutdown(String uuid) {
        if (!this.hasParticipant(uuid)) {
            return null;
        }
        if (this.jobStatus() == JobStatus.SUSPENDED) {
            return null;
        }
        this.requestTermination(TerminationMode.RESTART_GRACEFUL);
        TerminationMode terminationMode = this.requestedTerminationMode;
        if (terminationMode != null && terminationMode.isWithTerminalSnapshot()) {
            return this.terminalSnapshotFuture;
        }
        return null;
    }

    boolean maybeScaleUp(Collection<Member> currentDataMembers) {
        if (!this.jobConfig().isAutoScaling()) {
            return true;
        }
        if (this.executionPlanMap == null || this.executionPlanMap.size() == currentDataMembers.size()) {
            LoggingUtil.logFine(this.logger, "Not scaling job %s up: not running or already running on all members", this.jobIdString());
            return true;
        }
        JobStatus localStatus = this.jobStatus;
        if (localStatus == JobStatus.RUNNING && this.requestTermination(TerminationMode.RESTART_GRACEFUL)) {
            this.logger.info("Requested restart of " + this.jobIdString() + " to make use of added member(s)");
            return true;
        }
        return false;
    }

    private void assertLockHeld() {
        assert (Thread.holdsLock(this.lock)) : "the lock should be held at this place";
    }

    private void assertLockNotHeld() {
        assert (!Thread.holdsLock(this.lock)) : "the lock should not be held at this place";
    }

    private class ExecutionInvocationCallback
    implements ExecutionCallback<Object> {
        private final AtomicBoolean invocationsCancelled = new AtomicBoolean();
        private final long executionId;

        ExecutionInvocationCallback(long executionId) {
            this.executionId = executionId;
        }

        @Override
        public void onResponse(Object response) {
        }

        @Override
        public void onFailure(Throwable t) {
            if (!(ExceptionUtil.peel(t) instanceof TerminatedWithSnapshotException)) {
                this.cancelInvocations(null);
            }
        }

        void cancelInvocations(TerminationMode mode) {
            if (this.invocationsCancelled.compareAndSet(false, true)) {
                MasterContext.this.cancelExecutionInvocations(MasterContext.this.jobId, this.executionId, mode);
            }
        }
    }

    private static class SnapshotRestoreEdge
    extends Edge {
        SnapshotRestoreEdge(Vertex source, Vertex destination, int destOrdinal) {
            super(source, 0, destination, destOrdinal);
            this.distributed();
            this.partitioned(DistributedFunctions.entryKey());
        }

        @Override
        public int getPriority() {
            return Integer.MIN_VALUE;
        }
    }
}

