/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.memberselector.MemberSelectors;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.instance.Node;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.internal.partition.impl.InternalPartitionServiceImpl;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.JetConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.impl.JobExecutionService;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.MasterContext;
import com.hazelcast.jet.impl.SnapshotRepository;
import com.hazelcast.jet.impl.deployment.JetClassLoader;
import com.hazelcast.jet.impl.execution.SnapshotRecord;
import com.hazelcast.jet.impl.util.JetGroupProperty;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.executionservice.InternalExecutionService;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.util.Clock;
import com.hazelcast.util.executor.ExecutorType;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class JobCoordinationService {
    private static final String COORDINATOR_EXECUTOR_NAME = "jet:coordinator";
    private static final long RETRY_DELAY_IN_MILLIS = TimeUnit.SECONDS.toMillis(2L);
    private final NodeEngineImpl nodeEngine;
    private final JetConfig config;
    private final ILogger logger;
    private final JobRepository jobRepository;
    private final JobExecutionService jobExecutionService;
    private final SnapshotRepository snapshotRepository;
    private final ConcurrentMap<Long, MasterContext> masterContexts = new ConcurrentHashMap<Long, MasterContext>();

    public JobCoordinationService(NodeEngineImpl nodeEngine, JetConfig config, JobRepository jobRepository, JobExecutionService jobExecutionService, SnapshotRepository snapshotRepository) {
        this.nodeEngine = nodeEngine;
        this.config = config;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobRepository = jobRepository;
        this.jobExecutionService = jobExecutionService;
        this.snapshotRepository = snapshotRepository;
    }

    public void init() {
        InternalExecutionService executionService = this.nodeEngine.getExecutionService();
        HazelcastProperties properties = new HazelcastProperties(this.config.getProperties());
        long jobScanPeriodInMillis = properties.getMillis(JetGroupProperty.JOB_SCAN_PERIOD);
        executionService.register(COORDINATOR_EXECUTOR_NAME, 2, Integer.MAX_VALUE, ExecutorType.CACHED);
        executionService.scheduleWithRepetition(COORDINATOR_EXECUTOR_NAME, this::scanJobs, jobScanPeriodInMillis, jobScanPeriodInMillis, TimeUnit.MILLISECONDS);
    }

    public void reset() {
        this.masterContexts.values().forEach(MasterContext::cancelJob);
    }

    public ClassLoader getClassLoader(long jobId) {
        PrivilegedAction<JetClassLoader> action = () -> new JetClassLoader((Map<String, byte[]>)this.jobRepository.getJobResources(jobId));
        return this.jobExecutionService.getClassLoader(jobId, action);
    }

    public Map<Long, MasterContext> getMasterContexts() {
        return new HashMap<Long, MasterContext>(this.masterContexts);
    }

    public MasterContext getMasterContext(long jobId) {
        return (MasterContext)this.masterContexts.get(jobId);
    }

    void updateQuorumValues() {
        if (!this.shouldCheckQuorumValues()) {
            return;
        }
        try {
            int currentQuorumSize = this.getQuorumSize();
            for (JobRecord jobRecord : this.jobRepository.getJobRecords()) {
                boolean updated;
                if (!jobRecord.getConfig().isSplitBrainProtectionEnabled() || currentQuorumSize <= jobRecord.getQuorumSize() || !(updated = this.jobRepository.updateJobQuorumSizeIfLargerThanCurrent(jobRecord.getJobId(), currentQuorumSize))) continue;
                this.logger.info("Current quorum size: " + jobRecord.getQuorumSize() + " of job " + Util.idToString(jobRecord.getJobId()) + " is updated to: " + currentQuorumSize);
            }
        }
        catch (Exception e) {
            this.logger.fine("check quorum values task failed", (Throwable)e);
        }
    }

    private boolean shouldCheckQuorumValues() {
        return this.isMaster() && this.nodeEngine.isRunning() && this.getInternalPartitionService().getPartitionStateManager().isInitialized();
    }

    public CompletableFuture<Void> submitOrJoinJob(long jobId, Data dag, JobConfig config) {
        if (!this.isMaster()) {
            throw new JetException("Cannot submit Job " + Util.idToString(jobId) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Not starting job " + Util.idToString(jobId) + " since already completed with result: " + jobResult);
            return jobResult.asCompletableFuture();
        }
        int quorumSize = config.isSplitBrainProtectionEnabled() ? this.getQuorumSize() : 0;
        JobRecord jobRecord = new JobRecord(jobId, Clock.currentTimeMillis(), dag, config, quorumSize);
        MasterContext masterContext = new MasterContext(this.nodeEngine, this, jobRecord);
        MasterContext prev = this.masterContexts.putIfAbsent(jobId, masterContext);
        if (prev != null) {
            this.logger.fine("Joining to already started job " + Util.idToString(jobId));
            return prev.completionFuture();
        }
        if (this.completeMasterContextIfJobAlreadyCompleted(masterContext)) {
            return masterContext.completionFuture();
        }
        this.jobRepository.putNewJobRecord(jobRecord);
        this.logger.info("Starting job " + Util.idToString(jobId) + " based on submit request from client");
        this.nodeEngine.getExecutionService().execute(COORDINATOR_EXECUTOR_NAME, () -> this.tryStartJob(masterContext));
        return masterContext.completionFuture();
    }

    public CompletableFuture<Void> joinSubmittedJob(long jobId) {
        if (!this.isMaster()) {
            throw new JetException("Cannot join Job " + Util.idToString(jobId) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
        if (jobRecord != null) {
            return this.submitOrJoinJob(jobId, jobRecord.getDag(), jobRecord.getConfig());
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.asCompletableFuture();
        }
        throw new JobNotFoundException(jobId);
    }

    private void startJobIfNotStartedOrCompleted(JobRecord jobRecord) {
        long jobId = jobRecord.getJobId();
        if (this.jobRepository.getJobResult(jobId) != null || this.masterContexts.containsKey(jobId)) {
            return;
        }
        MasterContext masterContext = new MasterContext(this.nodeEngine, this, jobRecord);
        MasterContext prev = this.masterContexts.putIfAbsent(jobId, masterContext);
        if (prev != null) {
            return;
        }
        if (this.completeMasterContextIfJobAlreadyCompleted(masterContext)) {
            return;
        }
        this.logger.info("Starting job " + Util.idToString(masterContext.getJobId()) + " discovered by scanning of JobRecords");
        this.tryStartJob(masterContext);
    }

    private boolean completeMasterContextIfJobAlreadyCompleted(MasterContext masterContext) {
        long jobId = masterContext.getJobId();
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            this.logger.fine("Completing master context " + Util.idToString(jobId) + " since already completed with result: " + jobResult);
            masterContext.setFinalResult(jobResult.getFailure());
            return this.masterContexts.remove(jobId, masterContext);
        }
        if (!masterContext.getJobConfig().isAutoRestartOnMemberFailureEnabled() && this.jobRepository.getExecutionIdCount(jobId) > 0L) {
            String coordinator = this.nodeEngine.getNode().getThisUuid();
            TopologyChangedException result = new TopologyChangedException();
            this.logger.info("Completing Job " + Util.idToString(jobId) + " with " + (Object)((Object)result) + " since auto-restart is disabled and the job has been executed before");
            this.jobRepository.completeJob(jobId, coordinator, System.currentTimeMillis(), (Throwable)((Object)result));
            masterContext.setFinalResult((Throwable)((Object)result));
            return this.masterContexts.remove(jobId, masterContext);
        }
        return false;
    }

    private void tryStartJob(MasterContext masterContext) {
        masterContext.tryStartJob(this.jobRepository::newExecutionId);
    }

    private int getQuorumSize() {
        return this.getDataMemberCount() / 2 + 1;
    }

    boolean isQuorumPresent(int quorumSize) {
        return this.getDataMemberCount() >= quorumSize;
    }

    private int getDataMemberCount() {
        ClusterService clusterService = this.nodeEngine.getClusterService();
        return clusterService.getMembers(MemberSelectors.DATA_MEMBER_SELECTOR).size();
    }

    public void cancelJob(long jobId) {
        if (!this.isMaster()) {
            throw new JetException("Cannot cancel Job " + Util.idToString(jobId) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        if (this.jobRepository.getJobResult(jobId) != null) {
            this.logger.fine("Cannot cancel Job " + Util.idToString(jobId) + " because it already has a result");
            return;
        }
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            throw new RetryableHazelcastException("No MasterContext found for Job " + Util.idToString(jobId) + " to cancel");
        }
        if (!masterContext.isCancelled()) {
            this.logger.info("Job " + Util.idToString(jobId) + " cancellation is triggered");
            masterContext.cancelJob();
        } else {
            this.logger.info("Job " + Util.idToString(jobId) + " is already cancelling...");
        }
    }

    public Set<Long> getAllJobIds() {
        HashSet<Long> jobIds = new HashSet<Long>(this.jobRepository.getAllJobIds());
        jobIds.addAll(this.masterContexts.keySet());
        return jobIds;
    }

    public JobStatus getJobStatus(long jobId) {
        if (!this.isMaster()) {
            throw new JetException("Cannot query status of Job " + Util.idToString(jobId) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.getJobStatus();
        }
        MasterContext currentMasterContext = (MasterContext)this.masterContexts.get(jobId);
        if (currentMasterContext != null) {
            JobStatus jobStatus = currentMasterContext.jobStatus();
            if (jobStatus == JobStatus.RUNNING) {
                return currentMasterContext.isCancelled() ? JobStatus.COMPLETING : JobStatus.RUNNING;
            }
            return jobStatus;
        }
        JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
        if (jobRecord == null) {
            jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                return jobResult.getJobStatus();
            }
            throw new JobNotFoundException(jobId);
        }
        return JobStatus.NOT_STARTED;
    }

    public long getJobSubmissionTime(long jobId) {
        if (!this.isMaster()) {
            throw new JetException("Cannot query submission time of Job " + Util.idToString(jobId) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
        if (jobRecord != null) {
            return jobRecord.getCreationTime();
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.getCreationTime();
        }
        throw new JobNotFoundException(jobId);
    }

    public JobConfig getJobConfig(long jobId) {
        if (!this.isMaster()) {
            throw new JetException("Cannot query config of Job " + Util.idToString(jobId) + ". Master address: " + this.nodeEngine.getClusterService().getMasterAddress());
        }
        JobRecord jobRecord = this.jobRepository.getJobRecord(jobId);
        if (jobRecord != null) {
            return jobRecord.getConfig();
        }
        JobResult jobResult = this.jobRepository.getJobResult(jobId);
        if (jobResult != null) {
            return jobResult.getJobConfig();
        }
        throw new JobNotFoundException(jobId);
    }

    public boolean restartJobExecution(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext == null) {
            JobResult jobResult = this.jobRepository.getJobResult(jobId);
            if (jobResult != null) {
                throw new IllegalStateException("Cannot restart job " + Util.idToString(jobId) + " because it is already " + (Object)((Object)jobResult.getJobStatus()));
            }
            if (this.jobRepository.getJobRecord(jobId) != null) {
                this.logger.warning("Cannot restart job " + Util.idToString(jobId) + " because it is not initialized yet");
                return false;
            }
            throw new IllegalStateException("Cannot restart job " + Util.idToString(jobId) + " because JobRecord was not found");
        }
        boolean cancelled = masterContext.restartExecution();
        if (cancelled) {
            this.logger.info("Job " + Util.idToString(jobId) + " is going to be restarted");
        } else {
            this.logger.warning("Cannot restart job " + Util.idToString(jobId) + " because it is not currently being executed");
        }
        return cancelled;
    }

    SnapshotRepository snapshotRepository() {
        return this.snapshotRepository;
    }

    void completeJob(MasterContext masterContext, long executionId, long completionTime, Throwable error) {
        long jobId = masterContext.getJobId();
        String coordinator = this.nodeEngine.getNode().getThisUuid();
        this.jobRepository.completeJob(jobId, coordinator, completionTime, error);
        if (this.masterContexts.remove(masterContext.getJobId(), masterContext)) {
            this.logger.fine(Util.jobAndExecutionId(jobId, executionId) + " is completed");
        } else {
            MasterContext existing = (MasterContext)this.masterContexts.get(jobId);
            if (existing != null) {
                this.logger.severe("Different master context found to complete " + Util.jobAndExecutionId(jobId, executionId) + ", master context execution " + Util.idToString(existing.getExecutionId()));
            } else {
                this.logger.severe("No master context found to complete " + Util.jobAndExecutionId(jobId, executionId));
            }
        }
    }

    void scheduleRestart(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext != null) {
            this.logger.fine("Scheduling restart on master for job " + Util.idToString(jobId));
            this.nodeEngine.getExecutionService().schedule(COORDINATOR_EXECUTOR_NAME, () -> this.restartJob(jobId), RETRY_DELAY_IN_MILLIS, TimeUnit.MILLISECONDS);
        } else {
            this.logger.severe("Master context for job " + Util.idToString(jobId) + " not found to schedule restart");
        }
    }

    void scheduleSnapshot(long jobId, long executionId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext != null) {
            long snapshotInterval = masterContext.getJobConfig().getSnapshotIntervalMillis();
            InternalExecutionService executionService = this.nodeEngine.getExecutionService();
            if (this.logger.isFineEnabled()) {
                this.logger.fine(Util.jobAndExecutionId(jobId, executionId) + " snapshot is scheduled in " + snapshotInterval + "ms");
            }
            executionService.schedule(COORDINATOR_EXECUTOR_NAME, () -> this.beginSnapshot(jobId, executionId), snapshotInterval, TimeUnit.MILLISECONDS);
        } else {
            this.logger.warning("MasterContext not found to schedule snapshot of " + Util.jobAndExecutionId(jobId, executionId));
        }
    }

    private void beginSnapshot(long jobId, long executionId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext != null) {
            if (masterContext.completionFuture().isDone() || masterContext.isCancelled() || masterContext.jobStatus() != JobStatus.RUNNING) {
                this.logger.fine("Not starting snapshot since " + Util.jobAndExecutionId(jobId, executionId) + " is done.");
                return;
            }
            if (!this.shouldStartJobs()) {
                this.scheduleSnapshot(jobId, executionId);
                return;
            }
            masterContext.beginSnapshot(executionId);
        } else {
            this.logger.warning("MasterContext not found to schedule snapshot of " + Util.jobAndExecutionId(jobId, executionId));
        }
    }

    void completeSnapshot(long jobId, long executionId, long snapshotId, boolean isSuccess) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext != null) {
            try {
                SnapshotRecord.SnapshotStatus status = isSuccess ? SnapshotRecord.SnapshotStatus.SUCCESSFUL : SnapshotRecord.SnapshotStatus.FAILED;
                long elapsed = this.snapshotRepository.setSnapshotStatus(jobId, snapshotId, status);
                this.logger.info(String.format("Snapshot %s for job %s completed with status %s in %dms", new Object[]{snapshotId, Util.idToString(jobId), status, elapsed}));
            }
            catch (Exception e) {
                this.logger.warning("Cannot update snapshot status for " + Util.jobAndExecutionId(jobId, executionId) + " snapshot " + snapshotId + " isSuccess: " + isSuccess);
                return;
            }
            try {
                if (isSuccess) {
                    this.snapshotRepository.deleteAllSnapshotsExceptOne(jobId, snapshotId);
                } else {
                    this.snapshotRepository.deleteSingleSnapshot(jobId, snapshotId);
                }
            }
            catch (Exception e) {
                this.logger.warning("Cannot delete old snapshots for " + Util.jobAndExecutionId(jobId, executionId));
            }
            this.scheduleSnapshot(jobId, executionId);
        } else {
            this.logger.warning("MasterContext not found to finalize snapshot of " + Util.jobAndExecutionId(jobId, executionId) + " with result: " + isSuccess);
        }
    }

    boolean shouldStartJobs() {
        if (!this.isMaster() || !this.nodeEngine.isRunning()) {
            return false;
        }
        InternalPartitionServiceImpl partitionService = this.getInternalPartitionService();
        return partitionService.getPartitionStateManager().isInitialized() && partitionService.isMigrationAllowed() && !partitionService.hasOnGoingMigrationLocal();
    }

    public List<Long> getJobIds(String name) {
        HashMap jobs = new HashMap();
        this.jobRepository.getJobRecords(name).forEach(r -> jobs.put(r.getJobId(), r.getCreationTime()));
        this.masterContexts.values().stream().filter(ctx -> name.equals(ctx.getJobConfig().getName())).forEach(ctx -> jobs.put(ctx.getJobId(), ctx.getJobRecord().getCreationTime()));
        this.jobRepository.getJobResults(name).forEach(r -> jobs.put(r.getJobId(), r.getCreationTime()));
        return jobs.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getValue).reversed()).map(Map.Entry::getKey).collect(Collectors.toList());
    }

    private InternalPartitionServiceImpl getInternalPartitionService() {
        Node node = this.nodeEngine.getNode();
        return (InternalPartitionServiceImpl)node.getPartitionService();
    }

    private void restartJob(long jobId) {
        MasterContext masterContext = (MasterContext)this.masterContexts.get(jobId);
        if (masterContext != null) {
            if (masterContext.isCancelled()) {
                this.tryStartJob(masterContext);
                return;
            }
            this.tryStartJob(masterContext);
        } else {
            this.logger.severe("Master context for job " + Util.idToString(jobId) + " not found to restart");
        }
    }

    private void scanJobs() {
        if (!this.shouldStartJobs()) {
            return;
        }
        try {
            Collection<JobRecord> jobs = this.jobRepository.getJobRecords();
            jobs.forEach(this::startJobIfNotStartedOrCompleted);
            this.performCleanup();
        }
        catch (Exception e) {
            if (e instanceof HazelcastInstanceNotActiveException) {
                return;
            }
            this.logger.severe("Scanning jobs failed", (Throwable)e);
        }
    }

    private void performCleanup() {
        Set<Long> runningJobIds = this.masterContexts.keySet();
        this.jobRepository.cleanup(runningJobIds);
    }

    private boolean isMaster() {
        return this.nodeEngine.getClusterService().isMaster();
    }
}

