/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.ClusterServiceImpl;
import com.hazelcast.internal.cluster.impl.MembershipManager;
import com.hazelcast.internal.cluster.impl.operations.TriggerMemberListPublishOp;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.impl.deployment.JetClassLoader;
import com.hazelcast.jet.impl.execution.ExecutionContext;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.execution.TaskletExecutionService;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class JobExecutionService {
    private final NodeEngineImpl nodeEngine;
    private final ILogger logger;
    private final TaskletExecutionService taskletExecutionService;
    private final Set<Long> executionContextJobIds = Collections.newSetFromMap(new ConcurrentHashMap());
    private final ConcurrentMap<Long, ExecutionContext> executionContexts = new ConcurrentHashMap<Long, ExecutionContext>();
    private final ConcurrentHashMap<Long, JetClassLoader> classLoaders = new ConcurrentHashMap();

    JobExecutionService(NodeEngineImpl nodeEngine, TaskletExecutionService taskletExecutionService) {
        this.nodeEngine = nodeEngine;
        this.logger = nodeEngine.getLogger(this.getClass());
        this.taskletExecutionService = taskletExecutionService;
    }

    public ClassLoader getClassLoader(long jobId, PrivilegedAction<JetClassLoader> action) {
        return this.classLoaders.computeIfAbsent(jobId, k -> (JetClassLoader)AccessController.doPrivileged(action));
    }

    public ExecutionContext getExecutionContext(long executionId) {
        return (ExecutionContext)this.executionContexts.get(executionId);
    }

    Map<Long, ExecutionContext> getExecutionContextsFor(Address member) {
        return this.executionContexts.entrySet().stream().filter(entry -> ((ExecutionContext)entry.getValue()).hasParticipant(member)).collect(Collectors.toMap(DistributedFunctions.entryKey(), DistributedFunctions.entryValue()));
    }

    Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> getSenderMap(long executionId) {
        ExecutionContext ctx = (ExecutionContext)this.executionContexts.get(executionId);
        return ctx != null ? ctx.senderMap() : null;
    }

    public void reset(String reason, Supplier<RuntimeException> exceptionSupplier) {
        this.executionContexts.values().forEach(exeCtx -> {
            String message = String.format("Completing %s locally. Reason: %s", Util.jobAndExecutionId(exeCtx.jobId(), exeCtx.executionId()), reason);
            this.cancelAndComplete((ExecutionContext)exeCtx, message, (Throwable)exceptionSupplier.get());
        });
    }

    void onMemberLeave(Address address) {
        this.executionContexts.values().stream().filter(exeCtx -> exeCtx.coordinator().equals((Object)address) || exeCtx.hasParticipant(address)).forEach(exeCtx -> {
            String message = String.format("Completing %s locally. Reason: Coordinator %s left the cluster", Util.jobAndExecutionId(exeCtx.jobId(), exeCtx.executionId()), address);
            this.cancelAndComplete((ExecutionContext)exeCtx, message, (Throwable)((Object)new TopologyChangedException("Topology has been changed.")));
        });
    }

    private void cancelAndComplete(ExecutionContext exeCtx, String message, Throwable t) {
        try {
            exeCtx.cancelExecution().whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, e) -> {
                long executionId = exeCtx.executionId();
                this.logger.fine(message);
                this.completeExecution(executionId, t);
            }));
        }
        catch (Throwable e2) {
            this.logger.severe(String.format("Local cancellation of %s failed", Util.jobAndExecutionId(exeCtx.jobId(), exeCtx.executionId())), e2);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initExecution(long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion, Set<MemberInfo> participants, ExecutionPlan plan) {
        this.verifyClusterInformation(jobId, executionId, coordinator, coordinatorMemberListVersion, participants);
        this.failIfNotRunning();
        if (!this.executionContextJobIds.add(jobId)) {
            ExecutionContext current = (ExecutionContext)this.executionContexts.get(executionId);
            if (current != null) {
                throw new IllegalStateException(String.format("Execution context for %s for coordinator %s already exists for coordinator %s", Util.jobAndExecutionId(jobId, executionId), coordinator, current.coordinator()));
            }
            this.executionContexts.values().stream().filter(e -> e.jobId() == jobId).forEach(e -> this.logger.fine(String.format("Execution context for %s for coordinator %s already exists with local execution %s for coordinator %s", Util.jobAndExecutionId(jobId, executionId), coordinator, Util.idToString(e.jobId()), e.coordinator())));
            throw new RetryableHazelcastException();
        }
        Set<Address> addresses = participants.stream().map(MemberInfo::getAddress).collect(Collectors.toSet());
        ExecutionContext created = new ExecutionContext((NodeEngine)this.nodeEngine, this.taskletExecutionService, jobId, executionId, coordinator, addresses);
        try {
            created.initialize(plan);
        }
        finally {
            this.executionContexts.put(executionId, created);
        }
        this.logger.info("Execution plan for " + Util.jobAndExecutionId(jobId, executionId) + " initialized");
    }

    private void verifyClusterInformation(long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion, Set<MemberInfo> participants) {
        Address masterAddress = this.nodeEngine.getMasterAddress();
        if (!coordinator.equals((Object)masterAddress)) {
            this.failIfNotRunning();
            throw new IllegalStateException(String.format("Coordinator %s cannot initialize %s. Reason: it is not the master, the master is %s", coordinator, Util.jobAndExecutionId(jobId, executionId), masterAddress));
        }
        ClusterServiceImpl clusterService = (ClusterServiceImpl)this.nodeEngine.getClusterService();
        MembershipManager membershipManager = clusterService.getMembershipManager();
        int localMemberListVersion = membershipManager.getMemberListVersion();
        Address thisAddress = this.nodeEngine.getThisAddress();
        if (coordinatorMemberListVersion > localMemberListVersion) {
            assert (!masterAddress.equals((Object)thisAddress)) : String.format("Local node: %s is master but InitOperation has coordinator member list version: %s larger than  local member list version: %s", thisAddress, coordinatorMemberListVersion, localMemberListVersion);
            this.nodeEngine.getOperationService().send((Operation)new TriggerMemberListPublishOp(), masterAddress);
            throw new RetryableHazelcastException(String.format("Cannot initialize %s for coordinator %s, local member list version %s, coordinator member list version %s", Util.jobAndExecutionId(jobId, executionId), coordinator, localMemberListVersion, coordinatorMemberListVersion));
        }
        boolean isLocalMemberParticipant = false;
        for (MemberInfo participant : participants) {
            if (participant.getAddress().equals((Object)thisAddress)) {
                isLocalMemberParticipant = true;
            }
            if (membershipManager.getMember(participant.getAddress(), participant.getUuid()) != null) continue;
            throw new TopologyChangedException(String.format("Cannot initialize %s for coordinator %s: participant %s not found in local member list. Local member list version: %s, coordinator member list version: %s", Util.jobAndExecutionId(jobId, executionId), coordinator, participant, localMemberListVersion, coordinatorMemberListVersion));
        }
        if (!isLocalMemberParticipant) {
            throw new IllegalArgumentException(String.format("Cannot initialize %s since member %s is not in participants: %s", Util.jobAndExecutionId(jobId, executionId), thisAddress, participants));
        }
    }

    private void failIfNotRunning() {
        if (!this.nodeEngine.isRunning()) {
            throw new HazelcastInstanceNotActiveException();
        }
    }

    public ExecutionContext assertExecutionContext(Address coordinator, long jobId, long executionId, Operation callerOp) {
        Address masterAddress = this.nodeEngine.getMasterAddress();
        if (!coordinator.equals((Object)masterAddress)) {
            this.failIfNotRunning();
            throw new IllegalStateException(String.format("Coordinator %s cannot do '%s' for %s: it is not the master, the master is %s", coordinator, callerOp.getClass().getSimpleName(), Util.jobAndExecutionId(jobId, executionId), masterAddress));
        }
        this.failIfNotRunning();
        ExecutionContext executionContext = (ExecutionContext)this.executionContexts.get(executionId);
        if (executionContext == null) {
            throw new TopologyChangedException(String.format("%s not found for coordinator %s for '%s'", Util.jobAndExecutionId(jobId, executionId), coordinator, callerOp.getClass().getSimpleName()));
        }
        if (!executionContext.coordinator().equals((Object)coordinator) || executionContext.jobId() != jobId) {
            throw new IllegalStateException(String.format("%s, originally from coordinator %s, cannot do '%s' by coordinator %s and execution %s", Util.jobAndExecutionId(jobId, executionContext.executionId()), executionContext.coordinator(), callerOp.getClass().getSimpleName(), coordinator, Util.idToString(executionId)));
        }
        return executionContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void completeExecution(long executionId, Throwable error) {
        ExecutionContext executionContext = (ExecutionContext)this.executionContexts.remove(executionId);
        if (executionContext != null) {
            try {
                executionContext.completeExecution(error);
            }
            finally {
                this.classLoaders.remove(executionContext.jobId());
                this.executionContextJobIds.remove(executionContext.jobId());
                this.logger.fine("Completed execution of " + Util.jobAndExecutionId(executionContext.jobId(), executionId));
            }
        } else {
            this.logger.fine("Execution " + Util.idToString(executionId) + " not found for completion");
        }
    }
}

