/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.ClusterState;
import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.Member;
import com.hazelcast.core.MemberLeftException;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.JobAlreadyExistsException;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.impl.ClusterMetadata;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.JobSummary;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.EnteringPassiveClusterStateException;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.operation.GetClusterMetadataOperation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.JetProperties;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.exception.TargetNotMemberException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.InternalExecutionService;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.util.Clock;
import com.hazelcast.util.executor.ExecutorType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nonnull;

public class JobCoordinationService {
    private static final String COORDINATOR_EXECUTOR_NAME = "jet:coordinator";
    private static final long RETRY_DELAY_IN_MILLIS = TimeUnit.SECONDS.toMillis(2L);
    private static final ThreadLocal<Boolean> IS_JOB_COORDINATOR_THREAD = ThreadLocal.withInitial(() -> false);
    private final NodeEngineImpl nodeEngine;
    private final JetService jetService;
    private final JetConfig config;
    private final ILogger logger;
    private final JobRepository jobRepository;
    private final ConcurrentMap<Long, MasterContext> masterContexts = new ConcurrentHashMap<Long, MasterContext>();
    private final ConcurrentMap<String, CompletableFuture<Void>> membersShuttingDown = new ConcurrentHashMap<String, CompletableFuture<Void>>();
    private final Map<String, Long> removedMembers = new ConcurrentHashMap<String, Long>();
    private final Object lock = new Object();
    private volatile boolean isClusterEnteringPassiveState;
    private volatile boolean jobsScanned;
    private final AtomicInteger scaleUpScheduledCount = new AtomicInteger();

    JobCoordinationService(NodeEngineImpl nodeEngine, JetService jetService, JetConfig config, JobRepository jobRepository) {
        this.nodeEngine = nodeEngine;
        this.jetService = jetService;
        this.config = config;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobRepository = jobRepository;
    }

    public JobRepository jobRepository() {
        return this.jobRepository;
    }

    void startScanningForJobs() {
        InternalExecutionService executionService = this.nodeEngine.getExecutionService();
        HazelcastProperties properties = new HazelcastProperties(this.config.getProperties());
        long jobScanPeriodInMillis = properties.getMillis(JetProperties.JOB_SCAN_PERIOD);
        executionService.register(COORDINATOR_EXECUTOR_NAME, 2, Integer.MAX_VALUE, ExecutorType.CACHED);
        executionService.scheduleWithRepetition(COORDINATOR_EXECUTOR_NAME, this::scanJobs, 0L, jobScanPeriodInMillis, TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<Void> submitJob(long jobId, Data dag, Data serializedConfig) {
        return this.submitToCoordinatorThread(() -> {
            boolean hasDuplicateJobName;
            JobConfig config = (JobConfig)this.nodeEngine.getSerializationService().toObject((Object)serializedConfig);
            this.assertIsMaster("Cannot submit job " + com.hazelcast.jet.Util.idToString(jobId) + " to non-master node");
            this.checkOperationalState();
            JobResult jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                this.logger.fine("Not starting job " + com.hazelcast.jet.Util.idToString(jobId) + " since already completed with result: " + jobResult);
                return;
            }
            int quorumSize = config.isSplitBrainProtectionEnabled() ? this.getQuorumSize() : 0;
            String dagJson = this.dagToJson(jobId, config, dag);
            JobRecord jobRecord = new JobRecord(jobId, Clock.currentTimeMillis(), dag, dagJson, config);
            JobExecutionRecord jobExecutionRecord = new JobExecutionRecord(jobId, quorumSize, false);
            MasterContext masterContext = this.createMasterContext(jobRecord, jobExecutionRecord);
            Object object = this.lock;
            synchronized (object) {
                MasterContext prev;
                this.assertIsMaster("Cannot submit job " + com.hazelcast.jet.Util.idToString(jobId) + " to non-master node");
                this.checkOperationalState();
                boolean bl = hasDuplicateJobName = config.getName() != null && this.hasActiveJobWithName(config.getName());
                if (!hasDuplicateJobName && (prev = this.masterContexts.putIfAbsent(jobId, masterContext)) != null) {
                    this.logger.fine("Joining to already existing masterContext " + prev.jobIdString());
                    return;
                }
            }
            if (hasDuplicateJobName) {
                this.jobRepository.deleteJob(jobId);
                throw new JobAlreadyExistsException("Another active job with equal name (" + config.getName() + ") exists: " + com.hazelcast.jet.Util.idToString(jobId));
            }
            if (this.completeMasterContextIfJobAlreadyCompleted(masterContext)) {
                return;
            }
            this.jobRepository.putNewJobRecord(jobRecord);
            this.logger.info("Starting job " + com.hazelcast.jet.Util.idToString(masterContext.jobId()) + " based on submit request");
            this.nodeEngine.getExecutionService().execute(COORDINATOR_EXECUTOR_NAME, () -> this.tryStartJob(masterContext));
        });
    }

    MasterContext createMasterContext(JobRecord jobRecord, JobExecutionRecord jobExecutionRecord) {
        return new MasterContext(this.nodeEngine, this, jobRecord, jobExecutionRecord);
    }

    private boolean hasActiveJobWithName(@Nonnull String jobName) {
        if (!this.jobsScanned) {
            throw new RetryableHazelcastException("Cannot submit job with name '" + jobName + "' before the master node initializes job coordination service state");
        }
        return this.masterContexts.values().stream().anyMatch(ctx -> jobName.equals(ctx.jobConfig().getName()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> prepareForPassiveClusterState() {
        this.assertIsMaster("Cannot prepare for passive cluster state on a non-master node");
        Object object = this.lock;
        synchronized (object) {
            this.isClusterEnteringPassiveState = true;
        }
        CompletableFuture[] futures = (CompletableFuture[])this.masterContexts.values().stream().map(mc -> mc.jobContext().gracefullyTerminate()).toArray(CompletableFuture[]::new);
        return CompletableFuture.allOf(futures);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void clusterChangeDone() {
        Object object = this.lock;
        synchronized (object) {
            this.isClusterEnteringPassiveState = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reset() {
        ArrayList contexts;
        assert (!this.isMaster()) : "this member is a master";
        Object object = this.lock;
        synchronized (object) {
            contexts = new ArrayList(this.masterContexts.values());
            this.masterContexts.clear();
            this.jobsScanned = false;
        }
        contexts.forEach(ctx -> ctx.jobContext().setFinalResult(new CancellationException()));
    }

    public CompletableFuture<Void> joinSubmittedJob(long jobId) {
        this.assertIsMaster("Cannot join job " + com.hazelcast.jet.Util.idToString(jobId) + " on non-master node");
        this.checkOperationalState();
        return this.submitToCoordinatorThread(() -> {
            JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
            if (jobRecord != null) {
                JobExecutionRecord jobExecutionRecord = this.ensureExecutionRecord(jobId, this.jobRepository.getJobExecutionRecord(jobId));
                return this.startJobIfNotStartedOrCompleted(jobRecord, jobExecutionRecord, "join request from client");
            }
            JobResult jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                return jobResult.asCompletableFuture();
            }
            throw new JobNotFoundException(jobId);
        }).thenCompose(Function.identity());
    }

    public CompletableFuture<Void> terminateJob(long jobId, TerminationMode terminationMode) {
        this.assertIsMaster("Cannot " + (Object)((Object)terminationMode) + " job " + com.hazelcast.jet.Util.idToString(jobId) + " on non-master node");
        return this.submitToCoordinatorThread(() -> {
            JobResult jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                if (terminationMode == TerminationMode.CANCEL_FORCEFUL) {
                    this.logger.fine("Ignoring cancellation of a completed job " + com.hazelcast.jet.Util.idToString(jobId));
                    return;
                }
                throw new IllegalStateException("Cannot " + (Object)((Object)terminationMode) + " job " + com.hazelcast.jet.Util.idToString(jobId) + " because it already has a result: " + jobResult);
            }
            MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
            if (masterContext == null) {
                JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
                String message = "No MasterContext found for job " + com.hazelcast.jet.Util.idToString(jobId) + " for " + (Object)((Object)terminationMode);
                if (jobRecord != null) {
                    throw new RetryableHazelcastException(message);
                }
                throw new JobNotFoundException(jobId);
            }
            JobStatus jobStatus = masterContext.jobStatus();
            if (jobStatus != JobStatus.RUNNING && terminationMode != TerminationMode.CANCEL_FORCEFUL) {
                throw new IllegalStateException("Cannot " + (Object)((Object)terminationMode) + ", job status is " + (Object)((Object)jobStatus) + ", should be " + (Object)((Object)JobStatus.RUNNING));
            }
            String terminationResult = masterContext.jobContext().requestTermination(terminationMode, false).f1();
            if (terminationResult != null) {
                throw new IllegalStateException("Cannot " + (Object)((Object)terminationMode) + ": " + terminationResult);
            }
        });
    }

    public CompletableFuture<List<Long>> getAllJobIds() {
        this.assertIsMaster("Cannot query list of job ids on non-master node");
        return this.submitToCoordinatorThread(() -> {
            HashSet<Long> jobIds = new HashSet<Long>(this.jobRepository.getAllJobIds());
            jobIds.addAll(this.masterContexts.keySet());
            return new ArrayList<Long>(jobIds);
        });
    }

    public CompletableFuture<List<Long>> getJobIds(@Nonnull String name) {
        this.assertIsMaster("Cannot query list of job ids on non-master node");
        return this.submitToCoordinatorThread(() -> {
            Map<Long, Long> jobs = this.jobRepository.getJobResults(name).stream().collect(Collectors.toMap(JobResult::getJobId, JobResult::getCreationTime));
            for (MasterContext ctx : this.masterContexts.values()) {
                if (!name.equals(ctx.jobConfig().getName())) continue;
                jobs.putIfAbsent(ctx.jobId(), Long.MAX_VALUE);
            }
            return jobs.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue).reversed()).map(Map.Entry::getKey).collect(Collectors.toList());
        });
    }

    public CompletableFuture<JobStatus> getJobStatus(long jobId) {
        this.assertIsMaster("Cannot query status of job " + com.hazelcast.jet.Util.idToString(jobId) + " on non-master node");
        return this.submitToCoordinatorThread(() -> {
            JobResult jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                return jobResult.getJobStatus();
            }
            MasterContext currentMasterContext = (MasterContext)this.masterContexts.get(jobId);
            if (currentMasterContext != null) {
                JobStatus jobStatus = currentMasterContext.jobStatus();
                if (jobStatus == JobStatus.RUNNING && currentMasterContext.jobContext().requestedTerminationMode() != null) {
                    return JobStatus.COMPLETING;
                }
                return jobStatus;
            }
            JobExecutionRecord jobExecutionRecord = this.jobRepository.getJobExecutionRecord(jobId);
            if (jobExecutionRecord != null) {
                return jobExecutionRecord.isSuspended() ? JobStatus.SUSPENDED : JobStatus.NOT_RUNNING;
            }
            jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                return jobResult.getJobStatus();
            }
            throw new JobNotFoundException(jobId);
        });
    }

    public CompletableFuture<Long> getJobSubmissionTime(long jobId) {
        this.assertIsMaster("Cannot query submission time of job " + com.hazelcast.jet.Util.idToString(jobId) + " on non-master node");
        return this.submitToCoordinatorThread(() -> {
            JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
            if (jobRecord != null) {
                return jobRecord.getCreationTime();
            }
            JobResult jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                return jobResult.getCreationTime();
            }
            throw new JobNotFoundException(jobId);
        });
    }

    public void resumeJob(long jobId) {
        this.assertIsMaster("Cannot resume job " + com.hazelcast.jet.Util.idToString(jobId) + " on non-master node");
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            throw new JobNotFoundException("MasterContext not found to resume job " + com.hazelcast.jet.Util.idToString(jobId));
        }
        masterContext.jobContext().resumeJob(this.jobRepository::newExecutionId);
    }

    public CompletableFuture<List<JobSummary>> getJobSummaryList() {
        return this.submitToCoordinatorThread(() -> {
            HashMap jobs = new HashMap();
            this.jobRepository.getJobRecords().stream().map(this::getJobSummary).forEach(s -> jobs.put(s.getJobId(), s));
            this.jobRepository.getJobResults().stream().map(r -> new JobSummary(r.getJobId(), r.getJobNameOrId(), r.getJobStatus(), r.getCreationTime(), r.getCompletionTime(), r.getFailureText())).forEach(s -> jobs.put(s.getJobId(), s));
            return jobs.values().stream().sorted(Comparator.comparing(JobSummary::getSubmissionTime).reversed()).collect(Collectors.toList());
        });
    }

    @Nonnull
    public CompletableFuture<Void> addShuttingDownMember(String uuid) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        CompletableFuture<Void> oldFuture = this.membersShuttingDown.putIfAbsent(uuid, future);
        if (oldFuture != null) {
            return oldFuture;
        }
        if (this.removedMembers.containsKey(uuid)) {
            LoggingUtil.logFine(this.logger, "NotifyMemberShutdownOperation received for a member that was already removed from the cluster: %s", uuid);
            return CompletableFuture.completedFuture(null);
        }
        LoggingUtil.logFine(this.logger, "Added a shutting-down member: %s", uuid);
        CompletableFuture[] futures = (CompletableFuture[])this.masterContexts.values().stream().map(mc -> mc.jobContext().onParticipantGracefulShutdown(uuid)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> future.complete(null)));
        return future;
    }

    public Map<Long, MasterContext> getMasterContexts() {
        return new HashMap<Long, MasterContext>(this.masterContexts);
    }

    public MasterContext getMasterContext(long jobId) {
        return (MasterContext)this.masterContexts.get(jobId);
    }

    JetService getJetService() {
        return this.jetService;
    }

    boolean shouldStartJobs() {
        ClusterState clusterState = this.nodeEngine.getClusterService().getClusterState();
        if (!this.isMaster() || !this.nodeEngine.isRunning() || this.isClusterEnteringPassiveState || clusterState == ClusterState.PASSIVE || clusterState == ClusterState.IN_TRANSITION) {
            return false;
        }
        if (!this.allMembersHaveSameState(clusterState)) {
            LoggingUtil.logFine(this.logger, "Not starting jobs because not all members have the same state: %s", clusterState);
            return false;
        }
        if (!this.membersShuttingDown.isEmpty()) {
            LoggingUtil.logFine(this.logger, "Not starting jobs because members are shutting down: %s", this.membersShuttingDown.keySet());
            return false;
        }
        InternalPartitionServiceImpl partitionService = this.getInternalPartitionService();
        return partitionService.getPartitionStateManager().isInitialized() && partitionService.areMigrationTasksAllowed() && !partitionService.hasOnGoingMigrationLocal();
    }

    private boolean allMembersHaveSameState(ClusterState clusterState) {
        try {
            Set members = this.nodeEngine.getClusterService().getMembers();
            List futures = members.stream().filter(member -> !member.localMember()).map(this::clusterMetadataAsync).collect(Collectors.toList());
            return futures.stream().map(future -> {
                try {
                    return (ClusterMetadata)future.get();
                }
                catch (ExecutionException e) {
                    if (e.getCause() instanceof MemberLeftException || e.getCause() instanceof TargetNotMemberException) {
                        return null;
                    }
                    throw ExceptionUtil.sneakyThrow(e);
                }
                catch (Exception e) {
                    throw ExceptionUtil.sneakyThrow(e);
                }
            }).filter(Objects::nonNull).allMatch(metaData -> metaData.getState() == clusterState);
        }
        catch (Exception e) {
            this.logger.warning("Exception during member state check", (Throwable)e);
            return false;
        }
    }

    private Future<ClusterMetadata> clusterMetadataAsync(Member member) {
        return this.nodeEngine.getOperationService().invokeOnTarget("hz:impl:jetService", (Operation)new GetClusterMetadataOperation(), member.getAddress());
    }

    void onMemberAdded(MemberImpl addedMember) {
        this.removedMembers.remove(addedMember.getUuid());
        if (addedMember.isLiteMember()) {
            return;
        }
        this.updateQuorumValues();
        this.scheduleScaleUp(this.config.getInstanceConfig().getScaleUpDelayMillis());
    }

    void onMemberRemoved(String uuid) {
        if (this.membersShuttingDown.remove(uuid) != null) {
            LoggingUtil.logFine(this.logger, "Removed a shutting-down member: %s, now shuttingDownMembers=%s", uuid, this.membersShuttingDown.keySet());
        } else {
            this.removedMembers.put(uuid, System.nanoTime());
        }
        long removeThreshold = System.nanoTime() - TimeUnit.HOURS.toNanos(1L);
        this.removedMembers.entrySet().removeIf(en -> (Long)en.getValue() < removeThreshold);
    }

    boolean isQuorumPresent(int quorumSize) {
        return this.getDataMemberCount() >= quorumSize;
    }

    @CheckReturnValue
    CompletableFuture<Void> completeJob(MasterContext masterContext, long completionTime, Throwable error) {
        return this.submitToCoordinatorThread(() -> {
            long jobId = masterContext.jobId();
            String coordinator = this.nodeEngine.getNode().getThisUuid();
            this.jobRepository.completeJob(jobId, coordinator, completionTime, error);
            if (this.masterContexts.remove(masterContext.jobId(), masterContext)) {
                this.logger.fine(masterContext.jobIdString() + " is completed");
            } else {
                MasterContext existing = (MasterContext)this.masterContexts.get(jobId);
                if (existing != null) {
                    this.logger.severe("Different master context found to complete " + masterContext.jobIdString() + ", master context execution " + com.hazelcast.jet.Util.idToString(existing.executionId()));
                } else {
                    this.logger.severe("No master context found to complete " + masterContext.jobIdString());
                }
            }
        });
    }

    void scheduleRestart(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.severe("Master context for job " + com.hazelcast.jet.Util.idToString(jobId) + " not found to schedule restart");
            return;
        }
        this.logger.fine("Scheduling restart on master for job " + masterContext.jobName());
        this.nodeEngine.getExecutionService().schedule(COORDINATOR_EXECUTOR_NAME, () -> this.restartJob(jobId), RETRY_DELAY_IN_MILLIS, TimeUnit.MILLISECONDS);
    }

    void scheduleSnapshot(MasterContext mc, long executionId) {
        long snapshotInterval = mc.jobConfig().getSnapshotIntervalMillis();
        InternalExecutionService executionService = this.nodeEngine.getExecutionService();
        if (this.logger.isFineEnabled()) {
            this.logger.fine(mc.jobIdString() + " snapshot is scheduled in " + snapshotInterval + "ms");
        }
        executionService.schedule(COORDINATOR_EXECUTOR_NAME, () -> mc.snapshotContext().startScheduledSnapshot(executionId), snapshotInterval, TimeUnit.MILLISECONDS);
    }

    void restartJob(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.severe("Master context for job " + com.hazelcast.jet.Util.idToString(jobId) + " not found to restart");
            return;
        }
        this.tryStartJob(masterContext);
    }

    private void checkOperationalState() {
        if (this.isClusterEnteringPassiveState) {
            throw new EnteringPassiveClusterStateException();
        }
    }

    private void scheduleScaleUp(long delay) {
        int counter = this.scaleUpScheduledCount.incrementAndGet();
        this.nodeEngine.getExecutionService().schedule(() -> this.scaleJobsUpNow(counter), delay, TimeUnit.MILLISECONDS);
    }

    private void scaleJobsUpNow(int counter) {
        if (this.scaleUpScheduledCount.get() != counter) {
            return;
        }
        if (!this.shouldStartJobs()) {
            this.scheduleScaleUp(RETRY_DELAY_IN_MILLIS);
            return;
        }
        boolean allSucceeded = true;
        int dataMembersCount = this.nodeEngine.getClusterService().getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).size();
        int partitionCount = this.nodeEngine.getPartitionService().getPartitionCount();
        int dataMembersWithPartitionsCount = Math.min(dataMembersCount, partitionCount);
        for (MasterContext mc : this.masterContexts.values()) {
            allSucceeded &= mc.jobContext().maybeScaleUp(dataMembersWithPartitionsCount);
        }
        if (!allSucceeded) {
            this.scheduleScaleUp(RETRY_DELAY_IN_MILLIS);
        }
    }

    private void updateQuorumValues() {
        if (!this.shouldCheckQuorumValues()) {
            return;
        }
        this.submitToCoordinatorThread(() -> {
            try {
                int currentQuorumSize = this.getQuorumSize();
                for (JobRecord jobRecord : this.jobRepository.getJobRecords()) {
                    try {
                        if (!jobRecord.getConfig().isSplitBrainProtectionEnabled()) continue;
                        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobRecord.getJobId());
                        if (masterContext == null) {
                            this.jobRepository.updateJobQuorumSizeIfSmaller(jobRecord.getJobId(), currentQuorumSize);
                            masterContext = (MasterContext)this.masterContexts.get(jobRecord.getJobId());
                        }
                        if (masterContext == null) continue;
                        masterContext.updateQuorumSize(currentQuorumSize);
                    }
                    catch (Exception e) {
                        this.logger.severe("Quorum of job " + com.hazelcast.jet.Util.idToString(jobRecord.getJobId()) + " could not be updated to " + currentQuorumSize, (Throwable)e);
                    }
                }
            }
            catch (Exception e) {
                this.logger.severe("update quorum values task failed", (Throwable)e);
            }
        });
    }

    private boolean shouldCheckQuorumValues() {
        return this.isMaster() && this.nodeEngine.isRunning() && this.getInternalPartitionService().getPartitionStateManager().isInitialized();
    }

    private String dagToJson(long jobId, JobConfig jobConfig, Data dagData) {
        ClassLoader classLoader = this.jetService.getJobExecutionService().getClassLoader(jobConfig, jobId);
        DAG dag = (DAG)CustomClassLoadedObject.deserializeWithCustomClassLoader(this.nodeEngine.getSerializationService(), classLoader, dagData);
        int coopThreadCount = Util.getJetInstance((NodeEngine)this.nodeEngine).getConfig().getInstanceConfig().getCooperativeThreadCount();
        return dag.toJson(coopThreadCount).toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> startJobIfNotStartedOrCompleted(@Nonnull JobRecord jobRecord, @Nonnull JobExecutionRecord jobExecutionRecord, String reason) {
        MasterContext oldMasterContext;
        MasterContext masterContext;
        long jobId = jobRecord.getJobId();
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Not starting job " + com.hazelcast.jet.Util.idToString(jobId) + ", already has result: " + jobResult);
            return jobResult.asCompletableFuture();
        }
        Object object = this.lock;
        synchronized (object) {
            this.checkOperationalState();
            masterContext = this.createMasterContext(jobRecord, jobExecutionRecord);
            oldMasterContext = this.masterContexts.putIfAbsent(jobId, masterContext);
        }
        if (oldMasterContext != null) {
            return oldMasterContext.jobContext().jobCompletionFuture();
        }
        if (this.completeMasterContextIfJobAlreadyCompleted(masterContext)) {
            return masterContext.jobContext().jobCompletionFuture();
        }
        if (jobExecutionRecord.isSuspended()) {
            LoggingUtil.logFinest(this.logger, "MasterContext for suspended %s is created", masterContext.jobIdString());
        } else {
            this.logger.info("Starting job " + com.hazelcast.jet.Util.idToString(jobId) + ": " + reason);
            this.tryStartJob(masterContext);
        }
        return masterContext.jobContext().jobCompletionFuture();
    }

    private boolean completeMasterContextIfJobAlreadyCompleted(MasterContext masterContext) {
        long jobId = masterContext.jobId();
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Completing master context for " + masterContext.jobIdString() + " since already completed with result: " + jobResult);
            masterContext.jobContext().setFinalResult(jobResult.getFailureAsThrowable());
            return this.masterContexts.remove(jobId, masterContext);
        }
        if (!masterContext.jobConfig().isAutoScaling() && this.jobRepository.getExecutionIdCount(jobId) > 0L) {
            this.logger.info("Suspending or failing " + masterContext.jobIdString() + " since auto-restart is disabled and the job has been executed before");
            masterContext.jobContext().finalizeJob((Throwable)((Object)new TopologyChangedException()));
        }
        return false;
    }

    private void tryStartJob(MasterContext masterContext) {
        masterContext.jobContext().tryStartJob(this.jobRepository::newExecutionId);
    }

    private int getQuorumSize() {
        return this.getDataMemberCount() / 2 + 1;
    }

    private int getDataMemberCount() {
        ClusterService clusterService = this.nodeEngine.getClusterService();
        return clusterService.getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).size();
    }

    private JobSummary getJobSummary(JobRecord record) {
        JobExecutionRecord executionRecord;
        long execId;
        MasterContext ctx = (MasterContext)this.masterContexts.get(record.getJobId());
        long l = execId = ctx == null ? 0L : ctx.executionId();
        JobStatus status = ctx == null ? ((executionRecord = this.jobRepository.getJobExecutionRecord(record.getJobId())) != null && executionRecord.isSuspended() ? JobStatus.SUSPENDED : JobStatus.NOT_RUNNING) : ctx.jobStatus();
        return new JobSummary(record.getJobId(), execId, record.getJobNameOrId(), status, record.getCreationTime());
    }

    private InternalPartitionServiceImpl getInternalPartitionService() {
        Node node = this.nodeEngine.getNode();
        return (InternalPartitionServiceImpl)node.getPartitionService();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void scanJobs() {
        block8: {
            try {
                if (!this.shouldStartJobs()) {
                    return;
                }
                Collection<JobRecord> jobs = this.jobRepository.getJobRecords();
                for (JobRecord jobRecord : jobs) {
                    JobExecutionRecord jobExecutionRecord = this.ensureExecutionRecord(jobRecord.getJobId(), this.jobRepository.getJobExecutionRecord(jobRecord.getJobId()));
                    this.startJobIfNotStartedOrCompleted(jobRecord, jobExecutionRecord, "discovered by scanning of JobRecords");
                }
                this.jobRepository.cleanup((NodeEngine)this.nodeEngine);
                if (this.jobsScanned) break block8;
                Object object = this.lock;
                synchronized (object) {
                    this.jobsScanned = true;
                }
            }
            catch (Exception e) {
                if (e instanceof HazelcastInstanceNotActiveException) {
                    return;
                }
                this.logger.severe("Scanning jobs failed", (Throwable)e);
            }
        }
    }

    private JobExecutionRecord ensureExecutionRecord(long jobId, JobExecutionRecord record) {
        return record != null ? record : new JobExecutionRecord(jobId, this.getQuorumSize(), false);
    }

    void assertIsMaster(String error) {
        if (!this.isMaster()) {
            throw new JetException(error + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
    }

    private boolean isMaster() {
        return this.nodeEngine.getClusterService().isMaster();
    }

    NodeEngineImpl nodeEngine() {
        return this.nodeEngine;
    }

    CompletableFuture<Void> submitToCoordinatorThread(Runnable action) {
        return this.submitToCoordinatorThread(() -> {
            action.run();
            return null;
        });
    }

    private <T> CompletableFuture<T> submitToCoordinatorThread(Callable<T> action) {
        if (IS_JOB_COORDINATOR_THREAD.get().booleanValue()) {
            try {
                return CompletableFuture.completedFuture(action.call());
            }
            catch (Throwable e) {
                return Util.exceptionallyCompletedFuture(e);
            }
        }
        Future future = this.nodeEngine.getExecutionService().submit(COORDINATOR_EXECUTOR_NAME, () -> {
            assert (!IS_JOB_COORDINATOR_THREAD.get().booleanValue()) : "flag already raised";
            IS_JOB_COORDINATOR_THREAD.set(true);
            try {
                Object v = action.call();
                return v;
            }
            finally {
                IS_JOB_COORDINATOR_THREAD.set(false);
            }
        });
        return com.hazelcast.jet.Util.toCompletableFuture(this.nodeEngine.getExecutionService().asCompletableFuture(future));
    }
}

