/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.IMap;
import com.hazelcast.core.LocalMemberResetException;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.ClusterServiceImpl;
import com.hazelcast.internal.cluster.impl.MembersView;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.datamodel.Tuple2;
import com.hazelcast.jet.function.Functions;
import com.hazelcast.jet.impl.ExplodeSnapshotP;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.SnapshotValidator;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.JobTerminateRequestedException;
import com.hazelcast.jet.impl.exception.TerminatedWithSnapshotException;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.execution.init.ExecutionPlanBuilder;
import com.hazelcast.jet.impl.metrics.RawJobMetrics;
import com.hazelcast.jet.impl.operation.CompleteExecutionOperation;
import com.hazelcast.jet.impl.operation.GetLocalJobMetricsOperation;
import com.hazelcast.jet.impl.operation.InitExecutionOperation;
import com.hazelcast.jet.impl.operation.StartExecutionOperation;
import com.hazelcast.jet.impl.operation.TerminateExecutionOperation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.NonCompletableFuture;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.Operation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class MasterJobContext {
    public static final int SNAPSHOT_RESTORE_EDGE_PRIORITY = Integer.MIN_VALUE;
    public static final String SNAPSHOT_VERTEX_PREFIX = "__snapshot_";
    private static final int COLLECT_METRICS_RETRY_DELAY_MILLIS = 100;
    private static final Runnable NO_OP = () -> {};
    private final MasterContext mc;
    private final ILogger logger;
    private final int defaultParallelism;
    private volatile long executionStartTime = System.nanoTime();
    private volatile ExecutionFailureCallback executionFailureCallback;
    private volatile Set<Vertex> vertices;
    @Nonnull
    private volatile List<RawJobMetrics> jobMetrics = Collections.emptyList();
    @Nonnull
    private volatile CompletableFuture<Void> executionCompletionFuture = CompletableFuture.completedFuture(null);
    private final NonCompletableFuture jobCompletionFuture = new NonCompletableFuture();
    private volatile TerminationMode requestedTerminationMode;

    MasterJobContext(MasterContext masterContext, ILogger logger) {
        this.mc = masterContext;
        this.logger = logger;
        this.defaultParallelism = this.mc.getJetService().getJetInstance().getConfig().getInstanceConfig().getCooperativeThreadCount();
    }

    public CompletableFuture<Void> jobCompletionFuture() {
        return this.jobCompletionFuture;
    }

    TerminationMode requestedTerminationMode() {
        return this.requestedTerminationMode;
    }

    private boolean isCancelled() {
        return this.requestedTerminationMode == TerminationMode.CANCEL_FORCEFUL;
    }

    void tryStartJob(Supplier<Long> executionIdSupplier) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            this.executionStartTime = System.nanoTime();
            try {
                String mapName;
                JobExecutionRecord jobExecRec = this.mc.jobExecutionRecord();
                jobExecRec.markExecuted();
                Tuple2<DAG, ClassLoader> dagAndClassloader = this.resolveDagAndCL(executionIdSupplier);
                if (dagAndClassloader == null) {
                    return;
                }
                DAG dag = dagAndClassloader.f0();
                ClassLoader classLoader = dagAndClassloader.f1();
                String dotRepresentation = dag.toDotString(this.defaultParallelism);
                long snapshotId = jobExecRec.snapshotId();
                String snapshotName = this.mc.jobConfig().getInitialSnapshotName();
                String string = snapshotId >= 0L ? jobExecRec.successfulSnapshotDataMapName(this.mc.jobId()) : (mapName = snapshotName != null ? "__jet.exportedSnapshot." + snapshotName : null);
                if (mapName != null) {
                    this.rewriteDagWithSnapshotRestore(dag, snapshotId, mapName, snapshotName);
                } else {
                    this.logger.info("Didn't find any snapshot to restore for " + this.mc.jobIdString());
                }
                MembersView membersView = this.getMembersView();
                this.logger.info("Start executing " + this.mc.jobIdString() + ", execution graph in DOT format:\n" + dotRepresentation + "\nHINT: You can use graphviz or http://viz-js.com to visualize the printed graph.");
                this.logger.fine("Building execution plan for " + this.mc.jobIdString());
                try {
                    Util.doWithClassLoader(classLoader, () -> this.mc.setExecutionPlanMap(ExecutionPlanBuilder.createExecutionPlans(this.mc.nodeEngine(), membersView, dag, this.mc.jobId(), this.mc.executionId(), this.mc.jobConfig(), jobExecRec.ongoingSnapshotId())));
                }
                catch (Exception e) {
                    throw new UserCausedException(e);
                }
                this.logger.fine("Built execution plans for " + this.mc.jobIdString());
                Set<MemberInfo> participants = this.mc.executionPlanMap().keySet();
                Function<ExecutionPlan, Operation> operationCtor = plan -> new InitExecutionOperation(this.mc.jobId(), this.mc.executionId(), membersView.getVersion(), participants, (Data)this.mc.nodeEngine().getSerializationService().toData(plan));
                this.mc.invokeOnParticipants(operationCtor, this::onInitStepCompleted, null, false);
            }
            catch (UserCausedException e) {
                this.finalizeJob(e.getCause());
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private Tuple2<DAG, ClassLoader> resolveDagAndCL(Supplier<Long> executionIdSupplier) throws UserCausedException {
        this.mc.lock();
        try {
            DAG dag;
            if (this.isCancelled()) {
                this.logger.fine("Skipping init job '" + this.mc.jobName() + "': is already cancelled.");
                throw new UserCausedException(new CancellationException());
            }
            if (this.mc.jobStatus() != JobStatus.NOT_RUNNING) {
                this.logger.fine("Not starting job '" + this.mc.jobName() + "': status is " + (Object)((Object)this.mc.jobStatus()));
                Tuple2<DAG, ClassLoader> tuple2 = null;
                return tuple2;
            }
            if (this.mc.jobExecutionRecord().isSuspended()) {
                this.mc.jobExecutionRecord().setSuspended(false);
                this.mc.writeJobExecutionRecord(false);
                this.mc.setJobStatus(JobStatus.NOT_RUNNING);
            }
            if (this.scheduleRestartIfQuorumAbsent() || this.scheduleRestartIfClusterIsNotSafe()) {
                Tuple2<DAG, ClassLoader> tuple2 = null;
                return tuple2;
            }
            this.mc.setJobStatus(JobStatus.STARTING);
            this.mc.writeJobExecutionRecord(true);
            if (this.requestedTerminationMode != null) {
                if (this.requestedTerminationMode.actionAfterTerminate() != TerminationMode.ActionAfterTerminate.RESTART) {
                    throw new UserCausedException(new JobTerminateRequestedException(this.requestedTerminationMode));
                }
                this.requestedTerminationMode = null;
            }
            ClassLoader classLoader = this.mc.getJetService().getClassLoader(this.mc.jobId());
            try {
                dag = (DAG)CustomClassLoadedObject.deserializeWithCustomClassLoader(this.mc.nodeEngine().getSerializationService(), classLoader, this.mc.jobRecord().getDag());
            }
            catch (Exception e) {
                this.logger.warning("DAG deserialization failed", e);
                throw new UserCausedException(e);
            }
            this.vertices = new HashSet<Vertex>();
            dag.iterator().forEachRemaining(this.vertices::add);
            this.mc.setExecutionId(executionIdSupplier.get());
            this.mc.snapshotContext().onExecutionStarted();
            this.executionCompletionFuture = new CompletableFuture();
            Tuple2<DAG, ClassLoader> tuple2 = Tuple2.tuple2(dag, classLoader);
            return tuple2;
        }
        finally {
            this.mc.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    Tuple2<CompletableFuture<Void>, String> requestTermination(TerminationMode mode, boolean allowWhileExportingSnapshot) {
        Tuple2<CompletableFuture<Void>, Object> result;
        JobStatus localStatus;
        this.mc.coordinationService().assertOnCoordinatorThread();
        if (this.mc.jobConfig().getProcessingGuarantee() == ProcessingGuarantee.NONE && mode != TerminationMode.CANCEL_GRACEFUL) {
            mode = mode.withoutTerminalSnapshot();
        }
        this.mc.lock();
        try {
            localStatus = this.mc.jobStatus();
            if (localStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT && !allowWhileExportingSnapshot) {
                Tuple2<CompletableFuture<Void>, String> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, "Cannot cancel when job status is " + (Object)((Object)JobStatus.SUSPENDED_EXPORTING_SNAPSHOT));
                return tuple2;
            }
            if (localStatus == JobStatus.SUSPENDED && mode != TerminationMode.CANCEL_FORCEFUL) {
                Tuple2<CompletableFuture<Void>, String> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, "Job is " + (Object)((Object)JobStatus.SUSPENDED));
                return tuple2;
            }
            if (this.requestedTerminationMode != null) {
                String message = this.requestedTerminationMode == TerminationMode.CANCEL_FORCEFUL && mode == TerminationMode.CANCEL_FORCEFUL ? null : "Job is already terminating in mode: " + this.requestedTerminationMode.name();
                Tuple2<CompletableFuture<Void>, Object> tuple2 = Tuple2.tuple2(this.executionCompletionFuture, message);
                return tuple2;
            }
            this.requestedTerminationMode = mode;
            if (localStatus == JobStatus.SUSPENDED || localStatus == JobStatus.SUSPENDED_EXPORTING_SNAPSHOT) {
                this.mc.setJobStatus(JobStatus.FAILED);
                this.setFinalResult(new CancellationException());
            }
            if (mode.isWithTerminalSnapshot()) {
                this.mc.snapshotContext().enqueueSnapshot(null, true, null);
            }
            result = Tuple2.tuple2(this.executionCompletionFuture, null);
        }
        finally {
            this.mc.unlock();
        }
        if (localStatus == JobStatus.SUSPENDED) {
            try {
                this.mc.coordinationService().completeJob(this.mc, System.currentTimeMillis(), new CancellationException()).get();
            }
            catch (Exception e) {
                throw ExceptionUtil.rethrow(e);
            }
        } else if (localStatus == JobStatus.RUNNING || localStatus == JobStatus.STARTING) {
            this.handleTermination(mode);
        }
        return result;
    }

    private void rewriteDagWithSnapshotRestore(DAG dag, long snapshotId, String mapName, String snapshotName) throws UserCausedException {
        try {
            IMap<Object, Object> snapshotMap = this.mc.nodeEngine().getHazelcastInstance().getMap(mapName);
            long resolvedSnapshotId = SnapshotValidator.validateSnapshot(snapshotId, snapshotMap, this.mc.jobIdString(), snapshotName);
            this.logger.info(String.format("About to restore the state of %s from snapshot %d, mapName = %s", this.mc.jobIdString(), resolvedSnapshotId, mapName));
            ArrayList originalVertices = new ArrayList();
            dag.iterator().forEachRemaining(originalVertices::add);
            HashMap<String, Integer> vertexToOrdinal = new HashMap<String, Integer>();
            Vertex readSnapshotVertex = dag.newVertex("__snapshot_read", SourceProcessors.readMapP(mapName));
            Vertex explodeVertex = dag.newVertex("__snapshot_explode", () -> new ExplodeSnapshotP(vertexToOrdinal, resolvedSnapshotId));
            dag.edge(Edge.between(readSnapshotVertex, explodeVertex).isolated());
            int index = 0;
            for (Vertex userVertex : originalVertices) {
                vertexToOrdinal.put(userVertex.getName(), index);
                int destOrdinal = dag.getInboundEdges(userVertex.getName()).size();
                dag.edge(new SnapshotRestoreEdge(explodeVertex, index, userVertex, destOrdinal));
                ++index;
            }
        }
        catch (Exception e) {
            throw new UserCausedException(e);
        }
    }

    private boolean scheduleRestartIfQuorumAbsent() {
        int quorumSize = this.mc.jobExecutionRecord().getQuorumSize();
        if (this.mc.coordinationService().isQuorumPresent(quorumSize)) {
            return false;
        }
        this.logger.fine("Rescheduling restart of '" + this.mc.jobName() + "': quorum size " + quorumSize + " is not met");
        this.scheduleRestart();
        return true;
    }

    private boolean scheduleRestartIfClusterIsNotSafe() {
        if (this.mc.coordinationService().shouldStartJobs()) {
            return false;
        }
        this.logger.fine("Rescheduling restart of '" + this.mc.jobName() + "': cluster is not safe");
        this.scheduleRestart();
        return true;
    }

    private void scheduleRestart() {
        this.mc.assertLockHeld();
        JobStatus jobStatus = this.mc.jobStatus();
        if (jobStatus != JobStatus.NOT_RUNNING && jobStatus != JobStatus.STARTING && jobStatus != JobStatus.RUNNING) {
            throw new IllegalStateException("Restart scheduled in an unexpected state: " + (Object)((Object)jobStatus));
        }
        this.mc.setJobStatus(JobStatus.NOT_RUNNING);
        this.mc.coordinationService().scheduleRestart(this.mc.jobId());
    }

    private MembersView getMembersView() {
        ClusterServiceImpl clusterService = (ClusterServiceImpl)this.mc.nodeEngine().getClusterService();
        return clusterService.getMembershipManager().getMembersView();
    }

    private void onInitStepCompleted(Collection<Map.Entry<MemberInfo, Object>> responses) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            Throwable error = this.getResult("Init", responses);
            JobStatus status = this.mc.jobStatus();
            if (error == null && status == JobStatus.STARTING) {
                this.invokeStartExecution();
            } else {
                this.invokeCompleteExecution(error != null ? error : new IllegalStateException("Cannot execute " + this.mc.jobIdString() + ": status is " + (Object)((Object)status)));
            }
        });
    }

    private void invokeStartExecution() {
        this.logger.fine("Executing " + this.mc.jobIdString());
        long executionId = this.mc.executionId();
        this.executionFailureCallback = new ExecutionFailureCallback(executionId);
        if (this.requestedTerminationMode != null) {
            this.handleTermination(this.requestedTerminationMode);
        }
        Function<ExecutionPlan, Operation> operationCtor = plan -> new StartExecutionOperation(this.mc.jobId(), executionId);
        Consumer<Collection<Map.Entry<MemberInfo, Object>>> completionCallback = this::onExecuteStepCompleted;
        this.mc.setJobStatus(JobStatus.RUNNING);
        this.mc.invokeOnParticipants(operationCtor, completionCallback, this.executionFailureCallback, false);
        if (this.mc.jobConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE) {
            this.mc.coordinationService().scheduleSnapshot(this.mc, executionId);
        }
    }

    private void handleTermination(@Nonnull TerminationMode mode) {
        if (mode.isWithTerminalSnapshot()) {
            this.mc.snapshotContext().tryBeginSnapshot();
        } else if (this.executionFailureCallback != null) {
            this.executionFailureCallback.cancelInvocations(mode);
        }
    }

    private void onExecuteStepCompleted(Collection<Map.Entry<MemberInfo, Object>> responses) {
        this.invokeCompleteExecution(this.getResult("Execution", responses));
    }

    void setFinalResult(Throwable failure) {
        if (failure == null) {
            this.jobCompletionFuture.internalComplete();
        } else {
            this.jobCompletionFuture.internalCompleteExceptionally(failure);
        }
    }

    private Throwable getResult(String opName, Collection<Map.Entry<MemberInfo, Object>> responses) {
        if (this.isCancelled()) {
            this.logger.fine(this.mc.jobIdString() + " to be cancelled after " + opName);
            return new CancellationException();
        }
        Map<Boolean, List<Map.Entry>> grouped = responses.stream().map(en -> com.hazelcast.jet.Util.entry(((MemberInfo)en.getKey()).getAddress(), en.getValue())).collect(Collectors.partitioningBy(e1 -> e1.getValue() instanceof Throwable));
        int successfulMembersCount = grouped.getOrDefault(false, Collections.emptyList()).size();
        if (successfulMembersCount == this.mc.executionPlanMap().size()) {
            this.logger.fine(opName + " of " + this.mc.jobIdString() + " was successful");
            return null;
        }
        List failures = grouped.getOrDefault(true, Collections.emptyList());
        if (!failures.isEmpty()) {
            this.logger.fine(opName + " of " + this.mc.jobIdString() + " has failures: " + failures);
        }
        if (failures.stream().allMatch(entry -> entry.getValue() instanceof TerminatedWithSnapshotException)) {
            assert (opName.equals("Execution")) : "opName is '" + opName + "', expected 'Execution'";
            this.logger.fine(opName + " of " + this.mc.jobIdString() + " terminated after a terminal snapshot");
            TerminationMode mode = this.requestedTerminationMode;
            assert (mode != null && mode.isWithTerminalSnapshot()) : "mode=" + (Object)((Object)mode);
            return mode == TerminationMode.CANCEL_GRACEFUL ? new CancellationException() : new JobTerminateRequestedException(mode);
        }
        Map<Boolean, List<Map.Entry>> splitFailures = failures.stream().collect(Collectors.partitioningBy(e -> e.getValue() instanceof CancellationException || e.getValue() instanceof TerminatedWithSnapshotException || ExceptionUtil.isTopologyException((Throwable)e.getValue())));
        List topologyFailures = splitFailures.getOrDefault(true, Collections.emptyList());
        List otherFailures = splitFailures.getOrDefault(false, Collections.emptyList());
        if (!otherFailures.isEmpty()) {
            return (Throwable)((Map.Entry)otherFailures.get(0)).getValue();
        }
        return new TopologyChangedException("Causes from members: " + topologyFailures);
    }

    private void invokeCompleteExecution(Throwable error) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            Throwable finalError;
            JobStatus status = this.mc.jobStatus();
            if (status == JobStatus.STARTING || status == JobStatus.RUNNING) {
                this.logger.fine("Sending CompleteExecutionOperation for " + this.mc.jobIdString());
                finalError = error;
            } else {
                this.logCannotComplete(error);
                finalError = new IllegalStateException("Job coordination failed");
            }
            boolean savingMetricsEnabled = this.mc.jobConfig().isStoreMetricsAfterJobCompletion();
            Function<ExecutionPlan, Operation> operationCtor = plan -> new CompleteExecutionOperation(this.mc.executionId(), savingMetricsEnabled, finalError);
            this.mc.invokeOnParticipants(operationCtor, responses -> {
                if (responses.stream().map(Map.Entry::getValue).anyMatch(Throwable.class::isInstance)) {
                    this.logger.severe(this.mc.jobIdString() + ": some CompleteExecutionOperation invocations failed, execution resources might leak: " + responses);
                } else {
                    this.setJobMetrics(responses.stream().map(e -> (RawJobMetrics)e.getValue()).collect(Collectors.toList()));
                }
                this.onCompleteExecutionCompleted(error);
            }, null, true);
        });
    }

    private void logCannotComplete(Throwable error) {
        if (error != null) {
            this.logger.severe("Cannot properly complete failed " + this.mc.jobIdString() + ": status is " + (Object)((Object)this.mc.jobStatus()), error);
        } else {
            this.logger.severe("Cannot properly complete " + this.mc.jobIdString() + ": status is " + (Object)((Object)this.mc.jobStatus()));
        }
    }

    private void onCompleteExecutionCompleted(Throwable error) {
        if (error instanceof JobTerminateRequestedException && ((JobTerminateRequestedException)error).mode().isWithTerminalSnapshot()) {
            this.mc.snapshotContext().terminalSnapshotFuture().whenCompleteAsync((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> this.finalizeJob(error)));
        } else {
            this.finalizeJob(error);
        }
    }

    void cancelExecutionInvocations(long jobId, long executionId, TerminationMode mode) {
        this.mc.nodeEngine().getExecutionService().execute("hz:async", () -> this.mc.invokeOnParticipants(plan -> new TerminateExecutionOperation(jobId, executionId, mode), responses -> {
            if (responses.stream().map(Map.Entry::getValue).anyMatch(Objects::nonNull)) {
                this.logger.severe(this.mc.jobIdString() + ": some TerminateExecutionOperation invocations failed, execution might remain stuck: " + responses);
            }
        }, null, true));
    }

    void finalizeJob(@Nullable Throwable failure) {
        this.mc.coordinationService().submitToCoordinatorThread(() -> {
            Runnable nonSynchronizedAction;
            this.mc.lock();
            try {
                JobStatus status = this.mc.jobStatus();
                if (status == JobStatus.COMPLETED || status == JobStatus.FAILED) {
                    this.logIgnoredCompletion(failure, status);
                    return;
                }
                this.completeVertices(failure);
                boolean wasCancelled = this.isCancelled();
                this.requestedTerminationMode = null;
                this.executionFailureCallback = null;
                TerminationMode.ActionAfterTerminate terminationModeAction = failure instanceof JobTerminateRequestedException ? ((JobTerminateRequestedException)failure).mode().actionAfterTerminate() : null;
                this.mc.snapshotContext().onExecutionTerminated();
                if (terminationModeAction == TerminationMode.ActionAfterTerminate.RESTART) {
                    this.mc.setJobStatus(JobStatus.NOT_RUNNING);
                    nonSynchronizedAction = () -> this.mc.coordinationService().restartJob(this.mc.jobId());
                } else if (!wasCancelled && ExceptionUtil.isRestartableException(failure) && this.mc.jobConfig().isAutoScaling()) {
                    this.scheduleRestart();
                    nonSynchronizedAction = NO_OP;
                } else if (terminationModeAction == TerminationMode.ActionAfterTerminate.SUSPEND || ExceptionUtil.isRestartableException(failure) && !wasCancelled && !this.mc.jobConfig().isAutoScaling() && this.mc.jobConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE) {
                    this.mc.setJobStatus(JobStatus.SUSPENDED);
                    this.mc.jobExecutionRecord().setSuspended(true);
                    nonSynchronizedAction = () -> this.mc.writeJobExecutionRecord(false);
                } else {
                    this.mc.setJobStatus(this.isSuccess(failure) ? JobStatus.COMPLETED : JobStatus.FAILED);
                    if (failure instanceof LocalMemberResetException) {
                        this.logger.fine("Cancelling job " + this.mc.jobIdString() + " locally: member (local or remote) reset. We don't delete job metadata: job will restart on majority cluster");
                        this.setFinalResult(new CancellationException());
                        return;
                    }
                    this.mc.coordinationService().completeJob(this.mc, System.currentTimeMillis(), failure).whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, f) -> {
                        if (f != null) {
                            this.logger.warning("Completion of " + this.mc.jobIdString() + " failed", (Throwable)f);
                        } else {
                            this.setFinalResult(failure);
                        }
                    }));
                    nonSynchronizedAction = NO_OP;
                }
            }
            finally {
                this.mc.unlock();
            }
            this.executionCompletionFuture.complete(null);
            nonSynchronizedAction.run();
        });
    }

    private boolean isSuccess(@Nullable Throwable failure) {
        long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.executionStartTime);
        if (failure == null) {
            this.logger.info(String.format("Execution of %s completed in %,d ms", this.mc.jobIdString(), elapsed));
            return true;
        }
        if (failure instanceof CancellationException || failure instanceof JobTerminateRequestedException) {
            this.logger.info(String.format("Execution of %s completed in %,d ms, reason=%s", this.mc.jobIdString(), elapsed, failure));
            return false;
        }
        this.logger.severe(String.format("Execution of %s failed after %,d ms", this.mc.jobIdString(), elapsed), failure);
        return false;
    }

    private void logIgnoredCompletion(@Nullable Throwable failure, JobStatus status) {
        if (failure != null) {
            this.logger.severe("Ignoring failure completion of " + com.hazelcast.jet.Util.idToString(this.mc.jobId()) + " because status is " + (Object)((Object)status), failure);
        } else {
            this.logger.severe("Ignoring completion of " + com.hazelcast.jet.Util.idToString(this.mc.jobId()) + " because status is " + (Object)((Object)status));
        }
    }

    private void completeVertices(@Nullable Throwable failure) {
        if (this.vertices != null) {
            for (Vertex vertex : this.vertices) {
                try {
                    vertex.getMetaSupplier().close(failure);
                }
                catch (Exception e) {
                    this.logger.severe(this.mc.jobIdString() + " encountered an exception in ProcessorMetaSupplier.complete(), ignoring it", e);
                }
            }
        }
    }

    void resumeJob(Supplier<Long> executionIdSupplier) {
        this.mc.lock();
        try {
            if (this.mc.jobStatus() != JobStatus.SUSPENDED) {
                this.logger.info("Not resuming " + this.mc.jobIdString() + ": not " + (Object)((Object)JobStatus.SUSPENDED) + ", but " + (Object)((Object)this.mc.jobStatus()));
                return;
            }
            this.mc.setJobStatus(JobStatus.NOT_RUNNING);
        }
        finally {
            this.mc.unlock();
        }
        this.logger.fine("Resuming job " + this.mc.jobName());
        this.tryStartJob(executionIdSupplier);
    }

    private boolean hasParticipant(String uuid) {
        Map<MemberInfo, ExecutionPlan> planMap = this.mc.executionPlanMap();
        return this.mc.nodeEngine().getLocalMember().getUuid().equals(uuid) || planMap != null && planMap.keySet().stream().anyMatch(mi -> mi.getUuid().equals(uuid));
    }

    @Nonnull
    CompletableFuture<Void> onParticipantGracefulShutdown(String uuid) {
        return this.hasParticipant(uuid) ? this.gracefullyTerminate() : CompletableFuture.completedFuture(null);
    }

    @Nonnull
    CompletableFuture<Void> gracefullyTerminate() {
        CompletableFuture<CompletableFuture> future = this.mc.coordinationService().submitToCoordinatorThread(() -> this.requestTermination(TerminationMode.RESTART_GRACEFUL, false).f0());
        return future.thenCompose(Function.identity());
    }

    boolean maybeScaleUp(int dataMembersWithPartitionsCount) {
        this.mc.coordinationService().assertOnCoordinatorThread();
        if (!this.mc.jobConfig().isAutoScaling()) {
            return true;
        }
        if (this.mc.executionPlanMap() == null || this.mc.executionPlanMap().size() == dataMembersWithPartitionsCount) {
            LoggingUtil.logFine(this.logger, "Not scaling up %s: not running or already running on all members", this.mc.jobIdString());
            return true;
        }
        JobStatus localStatus = this.mc.jobStatus();
        if (localStatus == JobStatus.RUNNING && this.requestTermination(TerminationMode.RESTART_GRACEFUL, false).f1() == null) {
            this.logger.info("Requested restart of " + this.mc.jobIdString() + " to make use of added member(s). Job was running on " + this.mc.executionPlanMap().size() + " members, cluster now has " + dataMembersWithPartitionsCount + " data members with assigned partitions");
            return true;
        }
        return false;
    }

    List<RawJobMetrics> jobMetrics() {
        return this.jobMetrics;
    }

    private void setJobMetrics(List<RawJobMetrics> jobMetrics) {
        this.jobMetrics = Objects.requireNonNull(jobMetrics);
    }

    void collectMetrics(CompletableFuture<List<RawJobMetrics>> clientFuture) {
        if (this.mc.jobStatus() == JobStatus.RUNNING) {
            long jobId = this.mc.jobId();
            long executionId = this.mc.executionId();
            this.mc.invokeOnParticipants(plan -> new GetLocalJobMetricsOperation(jobId, executionId), objects -> this.completeWithMetrics(clientFuture, (Collection<Map.Entry<MemberInfo, Object>>)objects), null, false);
        } else {
            clientFuture.complete(this.jobMetrics);
        }
    }

    private void completeWithMetrics(CompletableFuture<List<RawJobMetrics>> clientFuture, Collection<Map.Entry<MemberInfo, Object>> metrics) {
        if (metrics.stream().anyMatch(en -> en.getValue() instanceof GetLocalJobMetricsOperation.ExecutionNotFoundException)) {
            LoggingUtil.logFinest(this.logger, "Rescheduling collectMetrics for %s, some members threw %s", this.mc.jobIdString(), GetLocalJobMetricsOperation.ExecutionNotFoundException.class.getSimpleName());
            this.mc.nodeEngine().getExecutionService().schedule(() -> this.collectMetrics(clientFuture), 100L, TimeUnit.MILLISECONDS);
            return;
        }
        Throwable firstThrowable = metrics.stream().map(Map.Entry::getValue).filter(Throwable.class::isInstance).findFirst().orElse(null);
        if (firstThrowable != null) {
            clientFuture.completeExceptionally(firstThrowable);
        } else {
            clientFuture.complete(metrics.stream().map(e -> (RawJobMetrics)e.getValue()).collect(Collectors.toList()));
        }
    }

    private class ExecutionFailureCallback
    implements Consumer<Throwable> {
        private final AtomicBoolean invocationsCancelled = new AtomicBoolean();
        private final long executionId;

        ExecutionFailureCallback(long executionId) {
            this.executionId = executionId;
        }

        @Override
        public void accept(Throwable t) {
            if (!(ExceptionUtil.peel(t) instanceof TerminatedWithSnapshotException)) {
                this.cancelInvocations(null);
            }
        }

        void cancelInvocations(TerminationMode mode) {
            if (this.invocationsCancelled.compareAndSet(false, true)) {
                MasterJobContext.this.cancelExecutionInvocations(MasterJobContext.this.mc.jobId(), this.executionId, mode);
            }
        }
    }

    private static class SnapshotRestoreEdge
    extends Edge {
        SnapshotRestoreEdge(Vertex source, int sourceOrdinal, Vertex destination, int destOrdinal) {
            super(source, sourceOrdinal, destination, destOrdinal);
            this.distributed();
            this.partitioned(Functions.entryKey());
        }

        @Override
        public int getPriority() {
            return Integer.MIN_VALUE;
        }
    }

    private static class UserCausedException
    extends Exception {
        UserCausedException(Exception cause) {
            super("", cause, false, false);
        }
    }
}

