/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.impl.MemberImpl;
import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.internal.util.executor.ExecutorType;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JobAlreadyExistsException;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JetProperties;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.JobSummary;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.EnteringPassiveClusterStateException;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.metrics.RawJobMetrics;
import com.hazelcast.jet.impl.observer.ObservableImpl;
import com.hazelcast.jet.impl.observer.WrappedThrowable;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.ExecutionService;
import com.hazelcast.spi.properties.HazelcastProperties;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class JobCoordinationService {
    private static final String COORDINATOR_EXECUTOR_NAME = "jet:coordinator";
    private static final long RETRY_DELAY_IN_MILLIS = TimeUnit.SECONDS.toMillis(2L);
    private static final ThreadLocal<Boolean> IS_JOB_COORDINATOR_THREAD = ThreadLocal.withInitial(() -> false);
    private static final int COORDINATOR_THREADS_POOL_SIZE = 4;
    private static final int MIN_JOB_SCAN_PERIOD_MILLIS = 100;
    private final NodeEngineImpl nodeEngine;
    private final JetService jetService;
    private final JetConfig config;
    private final ILogger logger;
    private final JobRepository jobRepository;
    private final ConcurrentMap<Long, MasterContext> masterContexts = new ConcurrentHashMap<Long, MasterContext>();
    private final ConcurrentMap<UUID, CompletableFuture<Void>> membersShuttingDown = new ConcurrentHashMap<UUID, CompletableFuture<Void>>();
    private final Map<UUID, Long> removedMembers = new ConcurrentHashMap<UUID, Long>();
    private final Object lock = new Object();
    private volatile boolean isClusterEnteringPassiveState;
    private volatile boolean jobsScanned;
    private final AtomicInteger scaleUpScheduledCount = new AtomicInteger();
    @Probe(name="jobs.submitted")
    private final Counter jobSubmitted = MwCounter.newMwCounter();
    @Probe(name="jobs.completedSuccessfully")
    private final Counter jobCompletedSuccessfully = MwCounter.newMwCounter();
    @Probe(name="jobs.completedWithFailure")
    private final Counter jobCompletedWithFailure = MwCounter.newMwCounter();
    private long maxJobScanPeriodInMillis;

    JobCoordinationService(NodeEngineImpl nodeEngine, JetService jetService, JetConfig config, JobRepository jobRepository) {
        this.nodeEngine = nodeEngine;
        this.jetService = jetService;
        this.config = config;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobRepository = jobRepository;
        ExecutionService executionService = nodeEngine.getExecutionService();
        executionService.register(COORDINATOR_EXECUTOR_NAME, 4, Integer.MAX_VALUE, ExecutorType.CACHED);
        MetricsRegistry registry = nodeEngine.getMetricsRegistry();
        MetricDescriptor descriptor = registry.newMetricDescriptor().withTag("module", "jet");
        registry.registerStaticMetrics(descriptor, (Object)this);
    }

    public JobRepository jobRepository() {
        return this.jobRepository;
    }

    void startScanningForJobs() {
        ExecutionService executionService = this.nodeEngine.getExecutionService();
        HazelcastProperties properties = new HazelcastProperties(this.config.getProperties());
        this.maxJobScanPeriodInMillis = properties.getMillis(JetProperties.JOB_SCAN_PERIOD);
        executionService.schedule(COORDINATOR_EXECUTOR_NAME, this::scanJobs, 0L, TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<Void> submitJob(long jobId, Data serializedDag, Data serializedConfig) {
        CompletableFuture<Void> res = new CompletableFuture<Void>();
        this.submitToCoordinatorThread(() -> {
            MasterContext masterContext;
            try {
                MasterContext prev;
                boolean hasDuplicateJobName;
                JobConfig config = (JobConfig)this.nodeEngine.getSerializationService().toObject((Object)serializedConfig);
                this.assertIsMaster("Cannot submit job " + com.hazelcast.jet.Util.idToString(jobId) + " to non-master node");
                this.checkOperationalState();
                JobResult jobResult = this.jobRepository.getJobResult(jobId);
                if (jobResult != null) {
                    this.logger.fine("Not starting job " + com.hazelcast.jet.Util.idToString(jobId) + " since already completed with result: " + jobResult);
                    return;
                }
                int quorumSize = config.isSplitBrainProtectionEnabled() ? this.getQuorumSize() : 0;
                DAG dag = this.deserializeDag(jobId, config, serializedDag);
                Set<String> ownedObservables = JobCoordinationService.ownedObservables(dag);
                JobRecord jobRecord = new JobRecord(jobId, serializedDag, this.dagToJson(dag), config, ownedObservables);
                JobExecutionRecord jobExecutionRecord = new JobExecutionRecord(jobId, quorumSize, false);
                masterContext = this.createMasterContext(jobRecord, jobExecutionRecord);
                Object object = this.lock;
                // MONITORENTER : object
                this.assertIsMaster("Cannot submit job " + com.hazelcast.jet.Util.idToString(jobId) + " to non-master node");
                this.checkOperationalState();
                boolean bl = hasDuplicateJobName = config.getName() != null && this.hasActiveJobWithName(config.getName());
                if (!hasDuplicateJobName && (prev = this.masterContexts.putIfAbsent(jobId, masterContext)) != null) {
                    this.logger.fine("Joining to already existing masterContext " + prev.jobIdString());
                    // MONITOREXIT : object
                    return;
                }
                // MONITOREXIT : object
                if (hasDuplicateJobName) {
                    this.jobRepository.deleteJob(jobId);
                    throw new JobAlreadyExistsException("Another active job with equal name (" + config.getName() + ") exists: " + com.hazelcast.jet.Util.idToString(jobId));
                }
                if (this.completeMasterContextIfJobAlreadyCompleted(masterContext)) {
                    return;
                }
                this.jobSubmitted.inc();
                this.jobRepository.putNewJobRecord(jobRecord);
                this.logger.info("Starting job " + com.hazelcast.jet.Util.idToString(masterContext.jobId()) + " based on submit request");
            }
            catch (Throwable e) {
                res.completeExceptionally(e);
                throw e;
            }
            finally {
                res.complete(null);
            }
            this.tryStartJob(masterContext);
        });
        return res;
    }

    private static Set<String> ownedObservables(DAG dag) {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(dag.iterator(), 0), false).map(vertex -> vertex.getMetaSupplier().getTags().get(ObservableImpl.OWNED_OBSERVABLE)).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    MasterContext createMasterContext(JobRecord jobRecord, JobExecutionRecord jobExecutionRecord) {
        return new MasterContext(this.nodeEngine, this, jobRecord, jobExecutionRecord);
    }

    private boolean hasActiveJobWithName(@Nonnull String jobName) {
        if (!this.jobsScanned) {
            throw new RetryableHazelcastException("Cannot submit job with name '" + jobName + "' before the master node initializes job coordination service's state");
        }
        return this.masterContexts.values().stream().anyMatch(ctx -> jobName.equals(ctx.jobConfig().getName()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> prepareForPassiveClusterState() {
        this.assertIsMaster("Cannot prepare for passive cluster state on a non-master node");
        Object object = this.lock;
        synchronized (object) {
            this.isClusterEnteringPassiveState = true;
        }
        return this.submitToCoordinatorThread(() -> {
            CompletableFuture[] futures = (CompletableFuture[])this.masterContexts.values().stream().map(mc -> mc.jobContext().gracefullyTerminate()).toArray(CompletableFuture[]::new);
            return CompletableFuture.allOf(futures);
        }).thenCompose(Function.identity());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void clusterChangeDone() {
        Object object = this.lock;
        synchronized (object) {
            this.isClusterEnteringPassiveState = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reset() {
        ArrayList contexts;
        assert (!this.isMaster()) : "this member is a master";
        Object object = this.lock;
        synchronized (object) {
            contexts = new ArrayList(this.masterContexts.values());
            this.masterContexts.clear();
            this.jobsScanned = false;
        }
        contexts.forEach(ctx -> ctx.jobContext().setFinalResult(new CancellationException()));
    }

    public CompletableFuture<Void> joinSubmittedJob(long jobId) {
        this.checkOperationalState();
        CompletableFuture<CompletableFuture> future = this.callWithJob(jobId, mc -> mc.jobContext().jobCompletionFuture().handle((r, t) -> {
            if (t == null) {
                return null;
            }
            if (t instanceof CancellationException || t instanceof JetException) {
                throw ExceptionUtil.sneakyThrow(t);
            }
            throw new JetException(t.toString(), (Throwable)t);
        }), JobResult::asCompletableFuture, jobRecord -> {
            JobExecutionRecord jobExecutionRecord = this.ensureExecutionRecord(jobId, this.jobRepository.getJobExecutionRecord(jobId));
            return this.startJobIfNotStartedOrCompleted((JobRecord)jobRecord, jobExecutionRecord, "join request from client");
        }, null);
        return future.thenCompose(Function.identity());
    }

    public CompletableFuture<Void> terminateJob(long jobId, TerminationMode terminationMode) {
        return this.runWithJob(jobId, masterContext -> {
            JobStatus jobStatus = masterContext.jobStatus();
            if (jobStatus != JobStatus.RUNNING && terminationMode != TerminationMode.CANCEL_FORCEFUL) {
                throw new IllegalStateException("Cannot " + (Object)((Object)terminationMode) + ", job status is " + (Object)((Object)jobStatus) + ", should be " + (Object)((Object)JobStatus.RUNNING));
            }
            String terminationResult = masterContext.jobContext().requestTermination(terminationMode, false).f1();
            if (terminationResult != null) {
                throw new IllegalStateException("Cannot " + (Object)((Object)terminationMode) + ": " + terminationResult);
            }
        }, jobResult -> {
            if (terminationMode != TerminationMode.CANCEL_FORCEFUL) {
                throw new IllegalStateException("Cannot " + (Object)((Object)terminationMode) + " job " + com.hazelcast.jet.Util.idToString(jobId) + " because it already has a result: " + jobResult);
            }
            this.logger.fine("Ignoring cancellation of a completed job " + com.hazelcast.jet.Util.idToString(jobId));
        }, jobExecutionRecord -> {
            throw new RetryableHazelcastException("No MasterContext found for job " + com.hazelcast.jet.Util.idToString(jobId) + " for " + (Object)((Object)terminationMode));
        });
    }

    public CompletableFuture<List<Long>> getAllJobIds() {
        this.assertIsMaster("Cannot query list of job ids on non-master node");
        return this.submitToCoordinatorThread(() -> {
            HashSet<Long> jobIds = new HashSet<Long>(this.jobRepository.getAllJobIds());
            jobIds.addAll(this.masterContexts.keySet());
            return new ArrayList<Long>(jobIds);
        });
    }

    public CompletableFuture<List<Long>> getJobIds(@Nonnull String name) {
        this.assertIsMaster("Cannot query list of job ids on non-master node");
        return this.submitToCoordinatorThread(() -> {
            HashMap<Long, Long> jobs = new HashMap<Long, Long>();
            for (MasterContext ctx : this.masterContexts.values()) {
                if (!name.equals(ctx.jobConfig().getName())) continue;
                jobs.put(ctx.jobId(), Long.MAX_VALUE);
            }
            this.jobRepository.getJobResults(name).forEach(jobResult -> jobs.put(jobResult.getJobId(), jobResult.getCreationTime()));
            return jobs.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue).reversed()).map(Map.Entry::getKey).collect(Collectors.toList());
        });
    }

    public CompletableFuture<JobStatus> getJobStatus(long jobId) {
        return this.callWithJob(jobId, mc -> {
            JobStatus jobStatus = mc.jobStatus();
            return jobStatus == JobStatus.RUNNING && mc.jobContext().requestedTerminationMode() != null ? JobStatus.COMPLETING : jobStatus;
        }, JobResult::getJobStatus, null, jobExecutionRecord -> jobExecutionRecord.isSuspended() ? JobStatus.SUSPENDED : JobStatus.NOT_RUNNING);
    }

    public CompletableFuture<List<RawJobMetrics>> getJobMetrics(long jobId) {
        CompletableFuture<List<RawJobMetrics>> cf = new CompletableFuture<List<RawJobMetrics>>();
        this.runWithJob(jobId, mc -> mc.jobContext().collectMetrics(cf), jobResult -> {
            List<RawJobMetrics> metrics = this.jobRepository.getJobMetrics(jobId);
            cf.complete(metrics != null ? metrics : Collections.emptyList());
        }, record -> cf.complete(Collections.emptyList()));
        return cf;
    }

    public CompletableFuture<Long> getJobSubmissionTime(long jobId) {
        return this.callWithJob(jobId, mc -> mc.jobRecord().getCreationTime(), JobResult::getCreationTime, JobRecord::getCreationTime, null);
    }

    public CompletableFuture<Void> resumeJob(long jobId) {
        return this.runWithJob(jobId, masterContext -> masterContext.jobContext().resumeJob(this.jobRepository::newExecutionId), jobResult -> {
            throw new IllegalStateException("Job already completed");
        }, jobExecutionRecord -> {
            throw new RetryableHazelcastException("Job " + com.hazelcast.jet.Util.idToString(jobId) + " not yet discovered");
        });
    }

    public CompletableFuture<List<JobSummary>> getJobSummaryList() {
        return this.submitToCoordinatorThread(() -> {
            HashMap jobs = new HashMap();
            this.jobRepository.getJobRecords().stream().map(this::getJobSummary).forEach(s -> jobs.put(s.getJobId(), s));
            this.jobRepository.getJobResults().stream().map(r -> new JobSummary(r.getJobId(), r.getJobNameOrId(), r.getJobStatus(), r.getCreationTime(), r.getCompletionTime(), r.getFailureText())).forEach(s -> jobs.put(s.getJobId(), s));
            return jobs.values().stream().sorted(Comparator.comparing(JobSummary::getSubmissionTime).reversed()).collect(Collectors.toList());
        });
    }

    @Nonnull
    public CompletableFuture<Void> addShuttingDownMember(UUID uuid) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        CompletableFuture<Void> oldFuture = this.membersShuttingDown.putIfAbsent(uuid, future);
        if (oldFuture != null) {
            return oldFuture;
        }
        if (this.removedMembers.containsKey(uuid)) {
            LoggingUtil.logFine(this.logger, "NotifyMemberShutdownOperation received for a member that was already removed from the cluster: %s", uuid);
            return CompletableFuture.completedFuture(null);
        }
        LoggingUtil.logFine(this.logger, "Added a shutting-down member: %s", uuid);
        CompletableFuture[] futures = (CompletableFuture[])this.masterContexts.values().stream().map(mc -> mc.jobContext().onParticipantGracefulShutdown(uuid)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> future.complete(null)));
        return future;
    }

    public Map<Long, MasterContext> getMasterContexts() {
        return new HashMap<Long, MasterContext>(this.masterContexts);
    }

    public MasterContext getMasterContext(long jobId) {
        return (MasterContext)this.masterContexts.get(jobId);
    }

    JetService getJetService() {
        return this.jetService;
    }

    boolean shouldStartJobs() {
        if (!this.isMaster() || !this.nodeEngine.isRunning()) {
            return false;
        }
        ClusterState clusterState = this.nodeEngine.getClusterService().getClusterState();
        if (this.isClusterEnteringPassiveState || clusterState == ClusterState.PASSIVE || clusterState == ClusterState.IN_TRANSITION) {
            this.logger.fine("Not starting jobs because cluster is in passive state or in transition.");
            return false;
        }
        if (!this.membersShuttingDown.isEmpty()) {
            LoggingUtil.logFine(this.logger, "Not starting jobs because members are shutting down: %s", this.membersShuttingDown.keySet());
            return false;
        }
        if (!this.getInternalPartitionService().isMemberStateSafe()) {
            this.logger.fine("Not starting jobs because master is not in safe state.");
            return false;
        }
        if (!this.getInternalPartitionService().getPartitionStateManager().isInitialized()) {
            this.logger.fine("Not starting jobs because partitions are not yet initialized.");
            return false;
        }
        return true;
    }

    private CompletableFuture<Void> runWithJob(long jobId, @Nonnull Consumer<MasterContext> masterContextHandler, @Nonnull Consumer<JobResult> jobResultHandler, @Nullable Consumer<JobExecutionRecord> jobExecutionRecordHandler) {
        return this.callWithJob(jobId, this.toNullFunction(masterContextHandler), this.toNullFunction(jobResultHandler), this.toNullFunction(null), this.toNullFunction(jobExecutionRecordHandler));
    }

    private <T, R> Function<T, R> toNullFunction(Consumer<T> consumer) {
        return val -> {
            consumer.accept(val);
            return null;
        };
    }

    private <T> CompletableFuture<T> callWithJob(long jobId, @Nonnull Function<MasterContext, T> masterContextHandler, @Nonnull Function<JobResult, T> jobResultHandler, @Nullable Function<JobRecord, T> jobRecordHandler, @Nullable Function<JobExecutionRecord, T> jobExecutionRecordHandler) {
        this.assertIsMaster("Cannot do this task on non-master. jobId=" + com.hazelcast.jet.Util.idToString(jobId));
        if (jobRecordHandler == null && jobExecutionRecordHandler == null) {
            throw new IllegalArgumentException();
        }
        return this.submitToCoordinatorThread(() -> {
            JobRecord jobRecord;
            JobExecutionRecord jobExRecord;
            MasterContext mc = (MasterContext)this.masterContexts.get(jobId);
            if (mc != null) {
                return masterContextHandler.apply(mc);
            }
            JobResult jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                return jobResultHandler.apply(jobResult);
            }
            if (jobExecutionRecordHandler != null && (jobExRecord = this.jobRepository.getJobExecutionRecord(jobId)) != null) {
                return jobExecutionRecordHandler.apply(jobExRecord);
            }
            if (jobRecordHandler != null && (jobRecord = this.jobRepository.getJobRecord(jobId)) != null) {
                return jobRecordHandler.apply(jobRecord);
            }
            jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                return jobResultHandler.apply(jobResult);
            }
            throw new JobNotFoundException(jobId);
        });
    }

    void onMemberAdded(MemberImpl addedMember) {
        this.removedMembers.remove(addedMember.getUuid());
        if (addedMember.isLiteMember()) {
            return;
        }
        this.updateQuorumValues();
        this.scheduleScaleUp(this.config.getInstanceConfig().getScaleUpDelayMillis());
    }

    void onMemberRemoved(UUID uuid) {
        if (this.membersShuttingDown.remove(uuid) != null) {
            LoggingUtil.logFine(this.logger, "Removed a shutting-down member: %s, now shuttingDownMembers=%s", uuid, this.membersShuttingDown.keySet());
        } else {
            this.removedMembers.put(uuid, System.nanoTime());
        }
        long removeThreshold = System.nanoTime() - TimeUnit.HOURS.toNanos(1L);
        this.removedMembers.entrySet().removeIf(en -> (Long)en.getValue() < removeThreshold);
    }

    boolean isQuorumPresent(int quorumSize) {
        return this.getDataMemberCount() >= quorumSize;
    }

    @CheckReturnValue
    CompletableFuture<Void> completeJob(MasterContext masterContext, Throwable error) {
        return this.submitToCoordinatorThread(() -> {
            List<RawJobMetrics> jobMetrics = masterContext.jobConfig().isStoreMetricsAfterJobCompletion() ? masterContext.jobContext().jobMetrics() : null;
            this.jobRepository.completeJob(masterContext, jobMetrics, error);
            if (this.masterContexts.remove(masterContext.jobId(), masterContext)) {
                this.completeObservables(masterContext.jobRecord().getOwnedObservables(), error);
                this.logger.fine(masterContext.jobIdString() + " is completed");
                (error == null ? this.jobCompletedSuccessfully : this.jobCompletedWithFailure).inc();
            } else {
                MasterContext existing = (MasterContext)this.masterContexts.get(masterContext.jobId());
                if (existing != null) {
                    this.logger.severe("Different master context found to complete " + masterContext.jobIdString() + ", master context execution " + com.hazelcast.jet.Util.idToString(existing.executionId()));
                } else {
                    this.logger.severe("No master context found to complete " + masterContext.jobIdString());
                }
            }
        });
    }

    void scheduleRestart(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.severe("Master context for job " + com.hazelcast.jet.Util.idToString(jobId) + " not found to schedule restart");
            return;
        }
        this.logger.fine("Scheduling restart on master for job " + masterContext.jobName());
        this.nodeEngine.getExecutionService().schedule(COORDINATOR_EXECUTOR_NAME, () -> this.restartJob(jobId), RETRY_DELAY_IN_MILLIS, TimeUnit.MILLISECONDS);
    }

    void scheduleSnapshot(MasterContext mc, long executionId) {
        long snapshotInterval = mc.jobConfig().getSnapshotIntervalMillis();
        ExecutionService executionService = this.nodeEngine.getExecutionService();
        if (this.logger.isFineEnabled()) {
            this.logger.fine(mc.jobIdString() + " snapshot is scheduled in " + snapshotInterval + "ms");
        }
        executionService.schedule(COORDINATOR_EXECUTOR_NAME, () -> mc.snapshotContext().startScheduledSnapshot(executionId), snapshotInterval, TimeUnit.MILLISECONDS);
    }

    void restartJob(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.severe("Master context for job " + com.hazelcast.jet.Util.idToString(jobId) + " not found to restart");
            return;
        }
        this.tryStartJob(masterContext);
    }

    private void checkOperationalState() {
        if (this.isClusterEnteringPassiveState) {
            throw new EnteringPassiveClusterStateException();
        }
    }

    private void scheduleScaleUp(long delay) {
        int counter = this.scaleUpScheduledCount.incrementAndGet();
        this.nodeEngine.getExecutionService().schedule(() -> this.scaleJobsUpNow(counter), delay, TimeUnit.MILLISECONDS);
    }

    private void scaleJobsUpNow(int counter) {
        if (this.scaleUpScheduledCount.get() != counter) {
            return;
        }
        if (!this.shouldStartJobs()) {
            this.scheduleScaleUp(RETRY_DELAY_IN_MILLIS);
            return;
        }
        this.submitToCoordinatorThread(() -> {
            boolean allSucceeded = true;
            int dataMembersCount = this.nodeEngine.getClusterService().getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).size();
            int partitionCount = this.nodeEngine.getPartitionService().getPartitionCount();
            int dataMembersWithPartitionsCount = Math.min(dataMembersCount, partitionCount);
            for (MasterContext mc : this.masterContexts.values()) {
                allSucceeded &= mc.jobContext().maybeScaleUp(dataMembersWithPartitionsCount);
            }
            if (!allSucceeded) {
                this.scheduleScaleUp(RETRY_DELAY_IN_MILLIS);
            }
        });
    }

    private void updateQuorumValues() {
        if (!this.shouldCheckQuorumValues()) {
            return;
        }
        this.submitToCoordinatorThread(() -> {
            try {
                int currentQuorumSize = this.getQuorumSize();
                for (JobRecord jobRecord : this.jobRepository.getJobRecords()) {
                    try {
                        if (!jobRecord.getConfig().isSplitBrainProtectionEnabled()) continue;
                        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobRecord.getJobId());
                        if (masterContext == null) {
                            this.jobRepository.updateJobQuorumSizeIfSmaller(jobRecord.getJobId(), currentQuorumSize);
                            masterContext = (MasterContext)this.masterContexts.get(jobRecord.getJobId());
                        }
                        if (masterContext == null) continue;
                        masterContext.updateQuorumSize(currentQuorumSize);
                    }
                    catch (Exception e) {
                        this.logger.severe("Quorum of job " + com.hazelcast.jet.Util.idToString(jobRecord.getJobId()) + " could not be updated to " + currentQuorumSize, (Throwable)e);
                    }
                }
            }
            catch (Exception e) {
                this.logger.severe("update quorum values task failed", (Throwable)e);
            }
        });
    }

    private boolean shouldCheckQuorumValues() {
        return this.isMaster() && this.nodeEngine.isRunning() && this.getInternalPartitionService().getPartitionStateManager().isInitialized();
    }

    private DAG deserializeDag(long jobId, JobConfig jobConfig, Data dagData) {
        ClassLoader classLoader = this.jetService.getJobExecutionService().getClassLoader(jobConfig, jobId);
        return (DAG)CustomClassLoadedObject.deserializeWithCustomClassLoader(this.nodeEngine.getSerializationService(), classLoader, dagData);
    }

    private String dagToJson(DAG dag) {
        int coopThreadCount = Util.getJetInstance((NodeEngine)this.nodeEngine).getConfig().getInstanceConfig().getCooperativeThreadCount();
        return dag.toJson(coopThreadCount).toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> startJobIfNotStartedOrCompleted(@Nonnull JobRecord jobRecord, @Nonnull JobExecutionRecord jobExecutionRecord, String reason) {
        MasterContext oldMasterContext;
        MasterContext masterContext;
        long jobId = jobRecord.getJobId();
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Not starting job " + com.hazelcast.jet.Util.idToString(jobId) + ", already has result: " + jobResult);
            return jobResult.asCompletableFuture();
        }
        Object object = this.lock;
        synchronized (object) {
            this.checkOperationalState();
            masterContext = this.createMasterContext(jobRecord, jobExecutionRecord);
            oldMasterContext = this.masterContexts.putIfAbsent(jobId, masterContext);
        }
        if (oldMasterContext != null) {
            return oldMasterContext.jobContext().jobCompletionFuture();
        }
        if (this.completeMasterContextIfJobAlreadyCompleted(masterContext)) {
            return masterContext.jobContext().jobCompletionFuture();
        }
        if (jobExecutionRecord.isSuspended()) {
            LoggingUtil.logFinest(this.logger, "MasterContext for suspended %s is created", masterContext.jobIdString());
        } else {
            this.logger.info("Starting job " + com.hazelcast.jet.Util.idToString(jobId) + ": " + reason);
            this.tryStartJob(masterContext);
        }
        return masterContext.jobContext().jobCompletionFuture();
    }

    private boolean completeMasterContextIfJobAlreadyCompleted(MasterContext masterContext) {
        long jobId = masterContext.jobId();
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Completing master context for " + masterContext.jobIdString() + " since already completed with result: " + jobResult);
            masterContext.jobContext().setFinalResult(jobResult.getFailureAsThrowable());
            return this.masterContexts.remove(jobId, masterContext);
        }
        if (!masterContext.jobConfig().isAutoScaling() && masterContext.jobExecutionRecord().executed()) {
            this.logger.info("Suspending or failing " + masterContext.jobIdString() + " since auto-restart is disabled and the job has been executed before");
            masterContext.jobContext().finalizeJob((Throwable)((Object)new TopologyChangedException()));
            return true;
        }
        return false;
    }

    private void tryStartJob(MasterContext masterContext) {
        masterContext.jobContext().tryStartJob(this.jobRepository::newExecutionId);
    }

    private int getQuorumSize() {
        return this.getDataMemberCount() / 2 + 1;
    }

    private int getDataMemberCount() {
        ClusterService clusterService = this.nodeEngine.getClusterService();
        return clusterService.getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).size();
    }

    private JobSummary getJobSummary(JobRecord record) {
        JobExecutionRecord executionRecord;
        long execId;
        MasterContext ctx = (MasterContext)this.masterContexts.get(record.getJobId());
        long l = execId = ctx == null ? 0L : ctx.executionId();
        JobStatus status = ctx == null ? ((executionRecord = this.jobRepository.getJobExecutionRecord(record.getJobId())) != null && executionRecord.isSuspended() ? JobStatus.SUSPENDED : JobStatus.NOT_RUNNING) : ctx.jobStatus();
        return new JobSummary(record.getJobId(), execId, record.getJobNameOrId(), status, record.getCreationTime());
    }

    private InternalPartitionServiceImpl getInternalPartitionService() {
        Node node = this.nodeEngine.getNode();
        return (InternalPartitionServiceImpl)node.getPartitionService();
    }

    private void scanJobs() {
        long nextScanDelay = this.maxJobScanPeriodInMillis;
        try {
            if (this.isMaster()) {
                if (this.shouldStartJobs()) {
                    this.doScanJobs();
                } else {
                    nextScanDelay = 100L;
                }
            }
        }
        catch (HazelcastInstanceNotActiveException hazelcastInstanceNotActiveException) {
        }
        catch (Exception e) {
            this.logger.severe("Scanning jobs failed", (Throwable)e);
        }
        ExecutionService executionService = this.nodeEngine.getExecutionService();
        executionService.schedule(this::scanJobs, nextScanDelay, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doScanJobs() {
        Collection<JobRecord> jobs = this.jobRepository.getJobRecords();
        for (JobRecord jobRecord : jobs) {
            JobExecutionRecord jobExecutionRecord = this.ensureExecutionRecord(jobRecord.getJobId(), this.jobRepository.getJobExecutionRecord(jobRecord.getJobId()));
            this.startJobIfNotStartedOrCompleted(jobRecord, jobExecutionRecord, "discovered by scanning of JobRecords");
        }
        this.jobRepository.cleanup((NodeEngine)this.nodeEngine);
        if (!this.jobsScanned) {
            Object object = this.lock;
            synchronized (object) {
                this.jobsScanned = true;
            }
        }
    }

    private JobExecutionRecord ensureExecutionRecord(long jobId, JobExecutionRecord record) {
        return record != null ? record : new JobExecutionRecord(jobId, this.getQuorumSize(), false);
    }

    void assertIsMaster(String error) {
        if (!this.isMaster()) {
            throw new JetException(error + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
    }

    private boolean isMaster() {
        return this.nodeEngine.getClusterService().isMaster();
    }

    NodeEngineImpl nodeEngine() {
        return this.nodeEngine;
    }

    CompletableFuture<Void> submitToCoordinatorThread(Runnable action) {
        return this.submitToCoordinatorThread(() -> {
            action.run();
            return null;
        });
    }

    <T> CompletableFuture<T> submitToCoordinatorThread(Callable<T> action) {
        if (IS_JOB_COORDINATOR_THREAD.get().booleanValue()) {
            try {
                return CompletableFuture.completedFuture(action.call());
            }
            catch (Throwable e) {
                return Util.exceptionallyCompletedFuture(e);
            }
        }
        Future future = this.nodeEngine.getExecutionService().submit(COORDINATOR_EXECUTOR_NAME, () -> {
            assert (!IS_JOB_COORDINATOR_THREAD.get().booleanValue()) : "flag already raised";
            IS_JOB_COORDINATOR_THREAD.set(true);
            try {
                Object v = action.call();
                return v;
            }
            catch (Throwable e) {
                this.logger.warning(null, e);
                throw e;
            }
            finally {
                IS_JOB_COORDINATOR_THREAD.set(false);
            }
        });
        return this.nodeEngine.getExecutionService().asCompletableFuture(future);
    }

    void assertOnCoordinatorThread() {
        assert (IS_JOB_COORDINATOR_THREAD.get().booleanValue()) : "not on coordinator thread";
    }

    private void completeObservables(Set<String> observables, Throwable error) {
        for (String observable : observables) {
            String ringbufferName = ObservableImpl.ringbufferName(observable);
            Ringbuffer ringbuffer = this.nodeEngine.getHazelcastInstance().getRingbuffer(ringbufferName);
            Object completion = error == null ? DoneItem.DONE_ITEM : WrappedThrowable.of(error);
            ringbuffer.addAsync(completion, OverflowPolicy.OVERWRITE);
        }
    }
}

