/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.Member;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.JobSummary;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.SnapshotRepository;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.ShutdownInProgressException;
import com.hazelcast.jet.impl.execution.SnapshotRecord;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.JetGroupProperty;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.InternalExecutionService;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.util.Clock;
import com.hazelcast.util.executor.ExecutorType;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class JobCoordinationService {
    private static final String COORDINATOR_EXECUTOR_NAME = "jet:coordinator";
    private static final long RETRY_DELAY_IN_MILLIS = TimeUnit.SECONDS.toMillis(2L);
    private final NodeEngineImpl nodeEngine;
    private final JetService jetService;
    private final JetConfig config;
    private final ILogger logger;
    private final JobRepository jobRepository;
    private final SnapshotRepository snapshotRepository;
    private final ConcurrentMap<Long, MasterContext> masterContexts = new ConcurrentHashMap<Long, MasterContext>();
    private final Set<String> membersShuttingDown = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Object lock = new Object();
    private volatile boolean isShutdown;
    private int awaitedTerminatingMembersCount;
    private CompletableFuture<Void> terminalSnapshotsFuture;
    private final AtomicInteger scaleUpScheduledCount = new AtomicInteger();

    JobCoordinationService(NodeEngineImpl nodeEngine, JetService jetService, JetConfig config, JobRepository jobRepository, SnapshotRepository snapshotRepository) {
        this.nodeEngine = nodeEngine;
        this.jetService = jetService;
        this.config = config;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobRepository = jobRepository;
        this.snapshotRepository = snapshotRepository;
    }

    public void init() {
        InternalExecutionService executionService = this.nodeEngine.getExecutionService();
        HazelcastProperties properties = new HazelcastProperties(this.config.getProperties());
        long jobScanPeriodInMillis = properties.getMillis(JetGroupProperty.JOB_SCAN_PERIOD);
        executionService.register(COORDINATOR_EXECUTOR_NAME, 2, Integer.MAX_VALUE, ExecutorType.CACHED);
        executionService.scheduleWithRepetition(COORDINATOR_EXECUTOR_NAME, this::scanJobs, jobScanPeriodInMillis, jobScanPeriodInMillis, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            this.isShutdown = true;
        }
    }

    public void reset() {
        assert (!this.isMaster()) : "this member is a master";
        this.masterContexts.values().forEach(ctx -> ctx.setFinalResult(new CancellationException()));
        this.masterContexts.clear();
    }

    public Map<Long, MasterContext> getMasterContexts() {
        return new HashMap<Long, MasterContext>(this.masterContexts);
    }

    public MasterContext getMasterContext(long jobId) {
        return (MasterContext)this.masterContexts.get(jobId);
    }

    void onMemberAdded(MemberImpl addedMember) {
        if (addedMember.isLiteMember()) {
            return;
        }
        this.updateQuorumValues();
        this.scheduleScaleUp(this.config.getInstanceConfig().getScaleUpDelayMillis());
    }

    private void scheduleScaleUp(long delay) {
        int counter = this.scaleUpScheduledCount.incrementAndGet();
        this.nodeEngine.getExecutionService().schedule(() -> this.scaleJobsUpNow(counter), delay, TimeUnit.MILLISECONDS);
    }

    private void scaleJobsUpNow(int counter) {
        if (this.scaleUpScheduledCount.get() != counter) {
            return;
        }
        if (!this.shouldStartJobs()) {
            this.scheduleScaleUp(RETRY_DELAY_IN_MILLIS);
        }
        boolean allSucceeded = true;
        Collection<Member> dataMembers = this.nodeEngine.getClusterService().getMembers(MemberSelectors.DATA_MEMBER_SELECTOR);
        for (MasterContext mc : this.masterContexts.values()) {
            allSucceeded &= mc.maybeScaleUp(dataMembers);
        }
        if (!allSucceeded) {
            this.scheduleScaleUp(RETRY_DELAY_IN_MILLIS);
        }
    }

    private void updateQuorumValues() {
        if (!this.shouldCheckQuorumValues()) {
            return;
        }
        try {
            int currentQuorumSize = this.getQuorumSize();
            for (JobRecord jobRecord : this.jobRepository.getJobRecords()) {
                boolean updated;
                if (!jobRecord.getConfig().isSplitBrainProtectionEnabled() || currentQuorumSize <= jobRecord.getQuorumSize() || !(updated = this.jobRepository.updateJobQuorumSizeIfLargerThanCurrent(jobRecord.getJobId(), currentQuorumSize))) continue;
                try {
                    MasterContext masterContext = (MasterContext)this.masterContexts.get(jobRecord.getJobId());
                    if (masterContext != null) {
                        masterContext.updateQuorumSize(currentQuorumSize);
                    }
                    this.logger.info("Current quorum size: " + jobRecord.getQuorumSize() + " of job " + com.hazelcast.jet.Util.idToString(jobRecord.getJobId()) + " is updated to: " + currentQuorumSize);
                }
                catch (Exception e) {
                    this.logger.severe("Quorum of job " + com.hazelcast.jet.Util.idToString(jobRecord.getJobId()) + " could not be updated to " + currentQuorumSize + " in its MasterContext object", e);
                }
            }
        }
        catch (Exception e) {
            this.logger.severe("update quorum values task failed", e);
        }
    }

    private boolean shouldCheckQuorumValues() {
        return this.isMaster() && this.nodeEngine.isRunning() && this.getInternalPartitionService().getPartitionStateManager().isInitialized();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void submitJob(long jobId, Data dag, JobConfig config) {
        this.assertIsMaster("Cannot submit job " + com.hazelcast.jet.Util.idToString(jobId) + " from non-master node");
        if (this.isShutdown) {
            throw new ShutdownInProgressException();
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Not starting job " + com.hazelcast.jet.Util.idToString(jobId) + " since already completed with result: " + jobResult);
            return;
        }
        int quorumSize = config.isSplitBrainProtectionEnabled() ? this.getQuorumSize() : 0;
        String dagJson = this.dagToJson(jobId, config, dag);
        JobRecord jobRecord = new JobRecord(jobId, Clock.currentTimeMillis(), dag, dagJson, config, quorumSize, false);
        MasterContext masterContext = new MasterContext(this.nodeEngine, this, jobRecord);
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown) {
                throw new ShutdownInProgressException();
            }
            MasterContext prev = this.masterContexts.putIfAbsent(jobId, masterContext);
            if (prev != null) {
                this.logger.fine("Joining to already existing masterContext " + prev.jobIdString());
                return;
            }
        }
        if (this.completeMasterContextIfJobAlreadyCompleted(masterContext)) {
            return;
        }
        this.jobRepository.putNewJobRecord(jobRecord);
        this.logger.info("Starting job " + com.hazelcast.jet.Util.idToString(masterContext.jobId()) + " based on submit request from client");
        this.nodeEngine.getExecutionService().execute(COORDINATOR_EXECUTOR_NAME, () -> this.tryStartJob(masterContext));
    }

    private String dagToJson(long jobId, JobConfig jobConfig, Data dagData) {
        ClassLoader classLoader = this.jetService.getJobExecutionService().getClassLoader(jobConfig, jobId);
        DAG dag = (DAG)CustomClassLoadedObject.deserializeWithCustomClassLoader(this.nodeEngine.getSerializationService(), classLoader, dagData);
        int coopThreadCount = Util.getJetInstance(this.nodeEngine).getConfig().getInstanceConfig().getCooperativeThreadCount();
        return dag.toJson(coopThreadCount).toString();
    }

    public CompletableFuture<Void> joinSubmittedJob(long jobId) {
        this.assertIsMaster("Cannot join job " + com.hazelcast.jet.Util.idToString(jobId) + " from non-master node");
        if (this.isShutdown) {
            throw new ShutdownInProgressException();
        }
        JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
        if (jobRecord != null) {
            return this.startJobIfNotStartedOrCompleted(jobRecord, "join request from client", false);
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.asCompletableFuture();
        }
        throw new JobNotFoundException(jobId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> startJobIfNotStartedOrCompleted(JobRecord jobRecord, String reason, boolean resume) {
        MasterContext oldMasterContext;
        MasterContext masterContext;
        long jobId = jobRecord.getJobId();
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Not starting job " + com.hazelcast.jet.Util.idToString(jobId) + ", already has result: " + jobResult);
            return jobResult.asCompletableFuture();
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown) {
                throw new ShutdownInProgressException();
            }
            masterContext = new MasterContext(this.nodeEngine, this, jobRecord);
            oldMasterContext = this.masterContexts.putIfAbsent(jobId, masterContext);
        }
        if (oldMasterContext != null) {
            if (resume && oldMasterContext.jobStatus() == JobStatus.SUSPENDED) {
                oldMasterContext.resumeJob(this.jobRepository::newExecutionId);
            }
            return oldMasterContext.completionFuture();
        }
        if (this.completeMasterContextIfJobAlreadyCompleted(masterContext)) {
            return masterContext.completionFuture();
        }
        this.logger.info("Starting job " + com.hazelcast.jet.Util.idToString(masterContext.jobId()) + ": " + reason);
        this.tryStartJob(masterContext);
        return masterContext.completionFuture();
    }

    private boolean completeMasterContextIfJobAlreadyCompleted(MasterContext masterContext) {
        long jobId = masterContext.jobId();
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Completing master context for " + masterContext.jobIdString() + " since already completed with result: " + jobResult);
            masterContext.setFinalResult(jobResult.getFailure());
            return this.masterContexts.remove(jobId, masterContext);
        }
        if (!masterContext.jobConfig().isAutoScaling() && this.jobRepository.getExecutionIdCount(jobId) > 0L) {
            this.logger.info("Suspending or failing " + masterContext.jobIdString() + " since auto-restart is disabled and the job has been executed before");
            masterContext.finalizeJob(new TopologyChangedException());
        }
        return false;
    }

    private void tryStartJob(MasterContext masterContext) {
        masterContext.tryStartJob(this.jobRepository::newExecutionId);
    }

    private int getQuorumSize() {
        return this.getDataMemberCount() / 2 + 1;
    }

    boolean isQuorumPresent(int quorumSize) {
        return this.getDataMemberCount() >= quorumSize;
    }

    private int getDataMemberCount() {
        ClusterService clusterService = this.nodeEngine.getClusterService();
        return clusterService.getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).size();
    }

    public void terminateJob(long jobId, TerminationMode terminationMode) {
        this.assertIsMaster("Cannot " + (Object)((Object)terminationMode) + " job " + com.hazelcast.jet.Util.idToString(jobId) + " from non-master node");
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            if (terminationMode == TerminationMode.CANCEL) {
                this.logger.fine("Ignoring cancellation of a completed job " + com.hazelcast.jet.Util.idToString(jobId));
                return;
            }
            throw new IllegalStateException("Cannot " + (Object)((Object)terminationMode) + " job " + com.hazelcast.jet.Util.idToString(jobId) + " because it already has a result: " + jobResult);
        }
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
            String message = "No MasterContext found for job " + com.hazelcast.jet.Util.idToString(jobId) + " for " + (Object)((Object)terminationMode);
            if (jobRecord != null) {
                throw new RetryableHazelcastException(message);
            }
            throw new JobNotFoundException(jobId);
        }
        JobStatus jobStatus = masterContext.jobStatus();
        if (jobStatus != JobStatus.RUNNING && terminationMode != TerminationMode.CANCEL) {
            throw new IllegalStateException("Cannot " + (Object)((Object)terminationMode) + ", job status is " + (Object)((Object)jobStatus) + ", should be " + (Object)((Object)JobStatus.RUNNING));
        }
        if (!masterContext.requestTermination(terminationMode)) {
            TerminationMode mcTerminationMode = masterContext.requestedTerminationMode();
            if (terminationMode == TerminationMode.CANCEL && mcTerminationMode == TerminationMode.CANCEL) {
                return;
            }
            throw new IllegalStateException("Cannot " + (Object)((Object)terminationMode) + ", job is already terminating in mode: " + (Object)((Object)mcTerminationMode));
        }
    }

    public Set<Long> getAllJobIds() {
        this.assertIsMaster("Cannot query list of job ids from non-master node");
        HashSet<Long> jobIds = new HashSet<Long>(this.jobRepository.getAllJobIds());
        jobIds.addAll(this.masterContexts.keySet());
        return jobIds;
    }

    public JobStatus getJobStatus(long jobId) {
        this.assertIsMaster("Cannot query status of job " + com.hazelcast.jet.Util.idToString(jobId) + " from non-master node");
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.getJobStatus();
        }
        MasterContext currentMasterContext = (MasterContext)this.masterContexts.get(jobId);
        if (currentMasterContext != null) {
            JobStatus jobStatus = currentMasterContext.jobStatus();
            if (jobStatus == JobStatus.RUNNING && currentMasterContext.requestedTerminationMode() != null) {
                return JobStatus.COMPLETING;
            }
            return jobStatus;
        }
        JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
        if (jobRecord != null) {
            return jobRecord.isSuspended() ? JobStatus.SUSPENDED : JobStatus.NOT_RUNNING;
        }
        jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.getJobStatus();
        }
        throw new JobNotFoundException(jobId);
    }

    public long getJobSubmissionTime(long jobId) {
        this.assertIsMaster("Cannot query submission time of job " + com.hazelcast.jet.Util.idToString(jobId) + " from non-master node");
        JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
        if (jobRecord != null) {
            return jobRecord.getCreationTime();
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.getCreationTime();
        }
        throw new JobNotFoundException(jobId);
    }

    public SnapshotRepository snapshotRepository() {
        return this.snapshotRepository;
    }

    void completeJob(MasterContext masterContext, long completionTime, Throwable error) {
        long jobId = masterContext.jobId();
        String coordinator = this.nodeEngine.getNode().getThisUuid();
        this.jobRepository.completeJob(jobId, coordinator, completionTime, error);
        if (this.masterContexts.remove(masterContext.jobId(), masterContext)) {
            this.logger.fine(masterContext.jobIdString() + " is completed");
        } else {
            MasterContext existing = (MasterContext)this.masterContexts.get(jobId);
            if (existing != null) {
                this.logger.severe("Different master context found to complete " + masterContext.jobIdString() + ", master context execution " + com.hazelcast.jet.Util.idToString(existing.executionId()));
            } else {
                this.logger.severe("No master context found to complete " + masterContext.jobIdString());
            }
        }
    }

    void suspendJob(MasterContext masterContext) {
        this.jobRepository.updateJobSuspendedStatus(masterContext.jobId(), true);
    }

    public void resumeJob(long jobId) {
        JobRecord jobRecord;
        this.assertIsMaster("Cannot resume job " + com.hazelcast.jet.Util.idToString(jobId) + " from non-master node");
        if (this.jobRepository.updateJobSuspendedStatus(jobId, false) && (jobRecord = this.jobRepository.getJobRecord(jobId)) != null) {
            this.startJobIfNotStartedOrCompleted(jobRecord, "resume request", true);
        }
    }

    void scheduleRestart(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.severe("Master context for job " + com.hazelcast.jet.Util.idToString(jobId) + " not found to schedule restart");
            return;
        }
        this.logger.fine("Scheduling restart on master for job " + masterContext.jobName());
        this.nodeEngine.getExecutionService().schedule(COORDINATOR_EXECUTOR_NAME, () -> this.restartJob(jobId), RETRY_DELAY_IN_MILLIS, TimeUnit.MILLISECONDS);
    }

    void scheduleSnapshot(long jobId, long executionId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.warning("MasterContext not found to schedule snapshot of " + com.hazelcast.jet.Util.idToString(jobId));
            return;
        }
        long snapshotInterval = masterContext.jobConfig().getSnapshotIntervalMillis();
        InternalExecutionService executionService = this.nodeEngine.getExecutionService();
        if (this.logger.isFineEnabled()) {
            this.logger.fine(masterContext.jobIdString() + " snapshot is scheduled in " + snapshotInterval + "ms");
        }
        executionService.schedule(COORDINATOR_EXECUTOR_NAME, () -> this.beginSnapshot(jobId, executionId), snapshotInterval, TimeUnit.MILLISECONDS);
    }

    void beginSnapshot(long jobId, long executionId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.warning("MasterContext not found to schedule snapshot of " + com.hazelcast.jet.Util.idToString(jobId));
            return;
        }
        if (masterContext.completionFuture().isDone() || masterContext.isCancelled() || masterContext.jobStatus() != JobStatus.RUNNING) {
            this.logger.fine("Not starting snapshot since " + masterContext.jobIdString() + " is done.");
            return;
        }
        if (!this.isMaster()) {
            this.logger.warning("Not starting snapshot, not a master, master is " + this.nodeEngine.getClusterService().getMasterAddress());
            return;
        }
        if (!this.nodeEngine.isRunning()) {
            this.logger.warning("Not starting snapshot, node engine is not running");
            return;
        }
        masterContext.beginSnapshot(executionId);
    }

    void completeSnapshot(long jobId, long snapshotId, boolean isSuccess, long numBytes, long numKeys, long numChunks) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.warning("MasterContext not found to finalize snapshot of " + com.hazelcast.jet.Util.idToString(jobId) + " with result: " + isSuccess);
            return;
        }
        try {
            SnapshotRecord.SnapshotStatus status = isSuccess ? SnapshotRecord.SnapshotStatus.SUCCESSFUL : SnapshotRecord.SnapshotStatus.FAILED;
            long elapsed = this.snapshotRepository.setSnapshotComplete(jobId, snapshotId, status, numBytes, numKeys, numChunks);
            this.logger.info(String.format("Snapshot %d for %s completed with status %s in %dms, %,d bytes, %,d keys in %,d chunks", new Object[]{snapshotId, masterContext.jobIdString(), status, elapsed, numBytes, numKeys, numChunks}));
        }
        catch (Exception e) {
            this.logger.warning("Cannot update snapshot status for " + masterContext.jobIdString() + " snapshot " + snapshotId + " isSuccess: " + isSuccess, e);
            return;
        }
        try {
            if (isSuccess) {
                this.snapshotRepository.deleteAllSnapshotsExceptOne(jobId, snapshotId);
            } else {
                this.snapshotRepository.deleteSingleSnapshot(jobId, snapshotId);
            }
        }
        catch (Exception e) {
            this.logger.warning("Cannot delete old snapshots for " + masterContext.jobIdString(), e);
        }
    }

    boolean shouldStartJobs() {
        if (!this.isMaster() || !this.nodeEngine.isRunning()) {
            return false;
        }
        if (this.nodeEngine.getClusterService().getMembers().stream().anyMatch(m -> this.membersShuttingDown.contains(m.getUuid()))) {
            LoggingUtil.logFine(this.logger, "Not starting jobs because members are shutting down: %s", this.membersShuttingDown);
            return false;
        }
        InternalPartitionServiceImpl partitionService = this.getInternalPartitionService();
        return partitionService.getPartitionStateManager().isInitialized() && partitionService.isMigrationAllowed() && !partitionService.hasOnGoingMigrationLocal();
    }

    public List<Long> getJobIds(String name) {
        HashMap jobs = new HashMap();
        this.jobRepository.getJobRecords(name).forEach(r -> jobs.put(r.getJobId(), r.getCreationTime()));
        this.masterContexts.values().stream().filter(ctx -> name.equals(ctx.jobConfig().getName())).forEach(ctx -> jobs.put(ctx.jobId(), ctx.jobRecord().getCreationTime()));
        this.jobRepository.getJobResults(name).forEach(r -> jobs.put(r.getJobId(), r.getCreationTime()));
        return jobs.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue).reversed()).map(Map.Entry::getKey).collect(Collectors.toList());
    }

    public List<JobSummary> getJobSummaryList() {
        HashMap jobs = new HashMap();
        this.jobRepository.getJobRecords().stream().map(this::getJobSummary).forEach(s -> jobs.put(s.getJobId(), s));
        this.jobRepository.getJobResults().stream().map(r -> new JobSummary(r.getJobId(), r.getJobNameOrId(), r.getJobStatus(), r.getCreationTime(), r.getCompletionTime(), r.getFailureReason())).forEach(s -> jobs.put(s.getJobId(), s));
        return jobs.values().stream().sorted(Comparator.comparing(JobSummary::getSubmissionTime).reversed()).collect(Collectors.toList());
    }

    private JobSummary getJobSummary(JobRecord record) {
        long execId;
        MasterContext ctx = (MasterContext)this.masterContexts.get(record.getJobId());
        long l = execId = ctx == null ? 0L : ctx.executionId();
        JobStatus status = ctx == null ? (record.isSuspended() ? JobStatus.SUSPENDED : JobStatus.NOT_RUNNING) : ctx.jobStatus();
        return new JobSummary(record.getJobId(), execId, record.getJobNameOrId(), status, record.getCreationTime());
    }

    private InternalPartitionServiceImpl getInternalPartitionService() {
        Node node = this.nodeEngine.getNode();
        return (InternalPartitionServiceImpl)node.getPartitionService();
    }

    void restartJob(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            this.logger.severe("Master context for job " + com.hazelcast.jet.Util.idToString(jobId) + " not found to restart");
            return;
        }
        this.tryStartJob(masterContext);
    }

    private void scanJobs() {
        if (!this.shouldStartJobs()) {
            return;
        }
        try {
            Collection<JobRecord> jobs = this.jobRepository.getJobRecords();
            jobs.stream().filter(jobRecord -> !jobRecord.isSuspended()).forEach(jobRecord -> this.startJobIfNotStartedOrCompleted((JobRecord)jobRecord, "discovered by scanning of JobRecords", false));
            this.performCleanup();
        }
        catch (Exception e) {
            if (e instanceof HazelcastInstanceNotActiveException) {
                return;
            }
            this.logger.severe("Scanning jobs failed", e);
        }
    }

    private void performCleanup() {
        Set<Long> runningJobIds = this.masterContexts.keySet();
        this.jobRepository.cleanup(runningJobIds);
    }

    private void assertIsMaster(String error) {
        if (!this.isMaster()) {
            throw new JetException(error + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
    }

    private boolean isMaster() {
        return this.nodeEngine.getClusterService().isMaster();
    }

    JetService getJetService() {
        return this.jetService;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    public CompletableFuture<Void> addShuttingDownMember(String uuid) {
        Object object = this.lock;
        synchronized (object) {
            if (uuid.equals(this.nodeEngine.getLocalMember().getUuid())) {
                this.shutdown();
            }
            CompletableFuture<Void> result = this.terminalSnapshotsFuture;
            if (this.membersShuttingDown.add(uuid)) {
                if (result == null) {
                    this.terminalSnapshotsFuture = result = new CompletableFuture();
                }
                this.logger.fine("Added a shutting-down member: " + uuid);
                CompletableFuture[] futures = (CompletableFuture[])this.masterContexts.values().stream().map(mc -> mc.onParticipantGracefulShutdown(uuid)).filter(Objects::nonNull).toArray(CompletableFuture[]::new);
                ++this.awaitedTerminatingMembersCount;
                CompletableFuture.allOf(futures).whenCompleteAsync((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> {
                    Object object = this.lock;
                    synchronized (object) {
                        if (--this.awaitedTerminatingMembersCount == 0) {
                            this.terminalSnapshotsFuture.complete(null);
                            this.terminalSnapshotsFuture = null;
                        }
                    }
                }));
            } else if (result == null) {
                result = CompletableFuture.completedFuture(null);
            }
            return result;
        }
    }

    void onMemberLeave(String uuid) {
        if (this.membersShuttingDown.remove(uuid)) {
            LoggingUtil.logFine(this.logger, "Removed a shutting-down member: %s, now shuttingDownMembers=%s", uuid, this.membersShuttingDown);
        }
    }
}

