/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.ClusterServiceImpl;
import com.hazelcast.internal.cluster.impl.MembershipManager;
import com.hazelcast.internal.cluster.impl.operations.TriggerMemberListPublishOp;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.deployment.JetClassLoader;
import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.execution.TaskletExecutionService;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.metrics.RawJobMetrics;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.security.AccessController;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

public class JobExecutionService
implements DynamicMetricsProvider {
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final TaskletExecutionService taskletExecutionService;
    private final JobRepository jobRepository;
    private final Set<Long> executionContextJobIds = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ConcurrentMap<Long, ExecutionContext> executionContexts = new ConcurrentHashMap<Long, ExecutionContext>();
    private final ConcurrentHashMap<Long, JetClassLoader> classLoaders = new ConcurrentHashMap();
    @Probe(name="jobs.executionStarted")
    private final Counter executionStarted = MwCounter.newMwCounter();
    @Probe(name="jobs.executionCompleted")
    private final Counter executionCompleted = MwCounter.newMwCounter();

    JobExecutionService(NodeEngineImpl nodeEngine, TaskletExecutionService taskletExecutionService, JobRepository jobRepository) {
        this.nodeEngine = nodeEngine;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.taskletExecutionService = taskletExecutionService;
        this.jobRepository = jobRepository;
        MetricsRegistry registry = nodeEngine.getMetricsRegistry();
        MetricDescriptor descriptor = registry.newMetricDescriptor().withTag("module", "jet");
        registry.registerStaticMetrics(descriptor, (Object)this);
    }

    public Long getExecutionIdForJobId(long jobId) {
        return this.executionContexts.values().stream().filter(ec -> ec.jobId() == jobId).findAny().map(ExecutionContext::executionId).orElse(null);
    }

    public ClassLoader getClassLoader(JobConfig config, long jobId) {
        return this.classLoaders.computeIfAbsent(jobId, k -> AccessController.doPrivileged(() -> {
            ClassLoader parent = config.getClassLoaderFactory() != null ? config.getClassLoaderFactory().getJobClassLoader() : this.nodeEngine.getConfigClassLoader();
            return new JetClassLoader((NodeEngine)this.nodeEngine, parent, config.getName(), jobId, this.jobRepository);
        }));
    }

    public ExecutionContext getExecutionContext(long executionId) {
        return (ExecutionContext)this.executionContexts.get(executionId);
    }

    Map<Long, ExecutionContext> getExecutionContextsFor(Address member) {
        return this.executionContexts.entrySet().stream().filter(entry -> ((ExecutionContext)entry.getValue()).hasParticipant(member)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> getSenderMap(long executionId) {
        ExecutionContext ctx = (ExecutionContext)this.executionContexts.get(executionId);
        return ctx != null ? ctx.senderMap() : null;
    }

    public synchronized void shutdown() {
        this.cancelAllExecutions("Node is shutting down", HazelcastInstanceNotActiveException::new);
    }

    public void reset() {
        this.cancelAllExecutions("reset", TopologyChangedException::new);
    }

    private void cancelAllExecutions(String reason, Supplier<RuntimeException> exceptionSupplier) {
        this.executionContexts.values().forEach(exeCtx -> {
            String message = String.format("Completing %s locally. Reason: %s", exeCtx.jobNameAndExecutionId(), reason);
            this.cancelAndComplete((ExecutionContext)exeCtx, message, (Throwable)exceptionSupplier.get());
        });
    }

    void onMemberRemoved(Address address) {
        this.executionContexts.values().stream().filter(exeCtx -> exeCtx.coordinator().equals((Object)address) || exeCtx.hasParticipant(address)).forEach(exeCtx -> {
            String message = String.format("Completing %s locally. Reason: Member %s left the cluster", exeCtx.jobNameAndExecutionId(), address);
            this.cancelAndComplete((ExecutionContext)exeCtx, message, (Throwable)((Object)new TopologyChangedException("Topology has been changed.")));
        });
    }

    private void cancelAndComplete(ExecutionContext exeCtx, String message, Throwable t) {
        try {
            exeCtx.terminateExecution(null).whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> {
                long executionId = exeCtx.executionId();
                this.logger.fine(message);
                this.completeExecution(executionId, t);
            }));
        }
        catch (Throwable e2) {
            this.logger.severe(String.format("Local cancellation of %s failed", exeCtx.jobNameAndExecutionId()), e2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void initExecution(long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion, Set<MemberInfo> participants, ExecutionPlan plan) {
        this.verifyClusterInformation(jobId, executionId, coordinator, coordinatorMemberListVersion, participants);
        this.failIfNotRunning();
        if (!this.executionContextJobIds.add(jobId)) {
            ExecutionContext current = (ExecutionContext)this.executionContexts.get(executionId);
            if (current != null) {
                throw new IllegalStateException(String.format("Execution context for %s for coordinator %s already exists for coordinator %s", current.jobNameAndExecutionId(), coordinator, current.coordinator()));
            }
            this.executionContexts.values().stream().filter(e -> e.jobId() == jobId).forEach(e -> this.logger.fine(String.format("Execution context for job %s for coordinator %s already exists with local execution %s for coordinator %s", com.hazelcast.jet.Util.idToString(jobId), coordinator, com.hazelcast.jet.Util.idToString(e.executionId()), e.coordinator())));
            throw new RetryableHazelcastException();
        }
        Set<Address> addresses = participants.stream().map(MemberInfo::getAddress).collect(Collectors.toSet());
        ExecutionContext created = new ExecutionContext((NodeEngine)this.nodeEngine, this.taskletExecutionService, jobId, executionId, coordinator, addresses);
        try {
            ClassLoader jobCl = this.getClassLoader(plan.getJobConfig(), jobId);
            Util.doWithClassLoader(jobCl, () -> created.initialize(plan));
        }
        finally {
            ExecutionContext oldContext = this.executionContexts.put(executionId, created);
            assert (oldContext == null) : "Duplicate ExecutionContext for execution " + com.hazelcast.jet.Util.idToString(executionId);
        }
        this.logger.info("Execution plan for jobId=" + com.hazelcast.jet.Util.idToString(jobId) + ", jobName=" + (created.jobName() != null ? '\'' + created.jobName() + '\'' : "null") + ", executionId=" + com.hazelcast.jet.Util.idToString(executionId) + " initialized");
    }

    private void verifyClusterInformation(long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion, Set<MemberInfo> participants) {
        Address masterAddress = this.nodeEngine.getMasterAddress();
        if (!coordinator.equals((Object)masterAddress)) {
            this.failIfNotRunning();
            throw new IllegalStateException(String.format("Coordinator %s cannot initialize %s. Reason: it is not the master, the master is %s", coordinator, Util.jobIdAndExecutionId(jobId, executionId), masterAddress));
        }
        ClusterServiceImpl clusterService = (ClusterServiceImpl)this.nodeEngine.getClusterService();
        MembershipManager membershipManager = clusterService.getMembershipManager();
        int localMemberListVersion = membershipManager.getMemberListVersion();
        Address thisAddress = this.nodeEngine.getThisAddress();
        if (coordinatorMemberListVersion > localMemberListVersion) {
            assert (!masterAddress.equals((Object)thisAddress)) : String.format("Local node: %s is master but InitOperation has coordinator member list version: %s larger than  local member list version: %s", thisAddress, coordinatorMemberListVersion, localMemberListVersion);
            this.nodeEngine.getOperationService().send((Operation)new TriggerMemberListPublishOp(), masterAddress);
            throw new RetryableHazelcastException(String.format("Cannot initialize %s for coordinator %s, local member list version %s, coordinator member list version %s", Util.jobIdAndExecutionId(jobId, executionId), coordinator, localMemberListVersion, coordinatorMemberListVersion));
        }
        boolean isLocalMemberParticipant = false;
        for (MemberInfo participant : participants) {
            if (participant.getAddress().equals((Object)thisAddress)) {
                isLocalMemberParticipant = true;
            }
            if (membershipManager.getMember(participant.getAddress(), participant.getUuid()) != null) continue;
            throw new TopologyChangedException(String.format("Cannot initialize %s for coordinator %s: participant %s not found in local member list. Local member list version: %s, coordinator member list version: %s", Util.jobIdAndExecutionId(jobId, executionId), coordinator, participant, localMemberListVersion, coordinatorMemberListVersion));
        }
        if (!isLocalMemberParticipant) {
            throw new IllegalArgumentException(String.format("Cannot initialize %s since member %s is not in participants: %s", Util.jobIdAndExecutionId(jobId, executionId), thisAddress, participants));
        }
    }

    private void failIfNotRunning() {
        if (!this.nodeEngine.isRunning()) {
            throw new HazelcastInstanceNotActiveException();
        }
    }

    public ExecutionContext assertExecutionContext(Address callerAddress, long jobId, long executionId, String callerOpName) {
        Address masterAddress = this.nodeEngine.getMasterAddress();
        if (!callerAddress.equals((Object)masterAddress)) {
            this.failIfNotRunning();
            throw new IllegalStateException(String.format("Caller %s cannot do '%s' for %s: it is not the master, the master is %s", callerAddress, callerOpName, Util.jobIdAndExecutionId(jobId, executionId), masterAddress));
        }
        this.failIfNotRunning();
        ExecutionContext executionContext = (ExecutionContext)this.executionContexts.get(executionId);
        if (executionContext == null) {
            throw new TopologyChangedException(String.format("%s not found for coordinator %s for '%s'", Util.jobIdAndExecutionId(jobId, executionId), callerAddress, callerOpName));
        }
        if (!executionContext.coordinator().equals((Object)callerAddress) || executionContext.jobId() != jobId) {
            throw new IllegalStateException(String.format("%s, originally from coordinator %s, cannot do '%s' by coordinator %s and execution %s", executionContext.jobNameAndExecutionId(), executionContext.coordinator(), callerOpName, callerAddress, com.hazelcast.jet.Util.idToString(executionId)));
        }
        return executionContext;
    }

    public void beforeCompleteExecution(long executionId) {
        ExecutionContext executionContext = (ExecutionContext)this.executionContexts.get(executionId);
        if (executionContext != null) {
            executionContext.setCompletionTime();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void completeExecution(long executionId, Throwable error) {
        ExecutionContext executionContext = (ExecutionContext)this.executionContexts.remove(executionId);
        if (executionContext != null) {
            JetClassLoader removedClassLoader = this.classLoaders.remove(executionContext.jobId());
            try {
                Util.doWithClassLoader(removedClassLoader, () -> executionContext.completeExecution(error));
            }
            finally {
                this.executionCompleted.inc();
                removedClassLoader.shutdown();
                this.executionContextJobIds.remove(executionContext.jobId());
                this.logger.fine("Completed execution of " + executionContext.jobNameAndExecutionId());
            }
        } else {
            this.logger.fine("Execution " + com.hazelcast.jet.Util.idToString(executionId) + " not found for completion");
        }
    }

    public void updateMetrics(@Nonnull Long executionId, RawJobMetrics metrics) {
        ExecutionContext executionContext = (ExecutionContext)this.executionContexts.get(executionId);
        if (executionContext != null) {
            executionContext.setJobMetrics(metrics);
        }
    }

    public CompletableFuture<Void> beginExecution(Address coordinator, long jobId, long executionId) {
        ExecutionContext execCtx = this.assertExecutionContext(coordinator, jobId, executionId, "ExecuteJobOperation");
        this.logger.info("Start execution of " + execCtx.jobNameAndExecutionId() + " from coordinator " + coordinator);
        this.executionStarted.inc();
        CompletableFuture<Void> future = execCtx.beginExecution();
        future.whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (i, e) -> {
            if (e instanceof CancellationException) {
                this.logger.fine("Execution of " + execCtx.jobNameAndExecutionId() + " was cancelled");
            } else if (e != null) {
                this.logger.fine("Execution of " + execCtx.jobNameAndExecutionId() + " completed with failure", e);
            } else {
                this.logger.fine("Execution of " + execCtx.jobNameAndExecutionId() + " completed");
            }
        }));
        return future;
    }

    int numberOfExecutions() {
        return this.executionContexts.size();
    }

    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        descriptor.withTag("module", "jet");
        this.executionContexts.forEach((id, ctx) -> ctx.provideDynamicMetrics(descriptor.copy(), context));
    }
}

