/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl;

import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.internal.cluster.MemberInfo;
import com.hazelcast.internal.cluster.impl.ClusterServiceImpl;
import com.hazelcast.internal.cluster.impl.MembersView;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TopologyChangedException;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedFunctions;
import com.hazelcast.jet.impl.JobCoordinationService;
import com.hazelcast.jet.impl.JobRecord;
import com.hazelcast.jet.impl.SnapshotRepository;
import com.hazelcast.jet.impl.execution.BroadcastEntry;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.execution.init.ExecutionPlanBuilder;
import com.hazelcast.jet.impl.operation.CompleteOperation;
import com.hazelcast.jet.impl.operation.ExecuteOperation;
import com.hazelcast.jet.impl.operation.InitOperation;
import com.hazelcast.jet.impl.operation.SnapshotOperation;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.query.TruePredicate;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.NodeEngineImpl;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

public class MasterContext {
    public static final int SNAPSHOT_RESTORE_EDGE_PRIORITY = Integer.MIN_VALUE;
    private final NodeEngineImpl nodeEngine;
    private final JobCoordinationService coordinationService;
    private final ILogger logger;
    private final JobRecord jobRecord;
    private final long jobId;
    private final CompletableFuture<Boolean> completionFuture = new CompletableFuture();
    private final AtomicReference<JobStatus> jobStatus = new AtomicReference<JobStatus>(JobStatus.NOT_STARTED);
    private final SnapshotRepository snapshotRepository;
    private volatile Set<Vertex> vertices;
    private volatile long executionId;
    private volatile long jobStartTime;
    private volatile Map<MemberInfo, ExecutionPlan> executionPlanMap;

    MasterContext(NodeEngineImpl nodeEngine, JobCoordinationService coordinationService, JobRecord jobRecord) {
        this.nodeEngine = nodeEngine;
        this.coordinationService = coordinationService;
        this.snapshotRepository = coordinationService.snapshotRepository();
        this.logger = nodeEngine.getLogger(this.getClass());
        this.jobRecord = jobRecord;
        this.jobId = jobRecord.getJobId();
    }

    public long getJobId() {
        return this.jobId;
    }

    public long getExecutionId() {
        return this.executionId;
    }

    public JobStatus jobStatus() {
        return this.jobStatus.get();
    }

    public JobConfig getJobConfig() {
        return this.jobRecord.getConfig();
    }

    CompletableFuture<Boolean> completionFuture() {
        return this.completionFuture;
    }

    boolean cancel() {
        return this.completionFuture.cancel(true);
    }

    boolean isCancelled() {
        return this.completionFuture.isCancelled();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void tryStartJob(Function<Long, Long> executionIdSupplier) {
        if (!this.setJobStatusToStarting()) {
            return;
        }
        if (this.scheduleRestartIfQuorumAbsent() || this.scheduleRestartIfClusterIsNotSafe()) {
            return;
        }
        DAG dag = this.deserializeDAG();
        this.vertices = new HashSet<Vertex>();
        dag.iterator().forEachRemaining(this.vertices::add);
        this.executionId = executionIdSupplier.apply(this.jobId);
        long lastSnapshotId = -1L;
        if (this.isSnapshottingEnabled()) {
            Long snapshotIdToRestore = this.snapshotRepository.latestCompleteSnapshot(this.jobId);
            this.snapshotRepository.deleteAllSnapshotsExceptOne(this.jobId, snapshotIdToRestore);
            Long lastStartedSnapshot = this.snapshotRepository.latestStartedSnapshot(this.jobId);
            if (snapshotIdToRestore != null) {
                this.logger.info("State of " + this.jobIdString() + " will be restored from snapshot " + snapshotIdToRestore);
                this.rewriteDagWithSnapshotRestore(dag, snapshotIdToRestore);
            } else {
                this.logger.warning("No usable snapshot for " + this.jobIdString() + " found.");
            }
            if (lastStartedSnapshot != null) {
                lastSnapshotId = lastStartedSnapshot;
            }
        }
        MembersView membersView = this.getMembersView();
        ClassLoader previousCL = MasterContext.swapContextClassLoader(this.coordinationService.getClassLoader(this.jobId));
        try {
            this.logger.info("Start executing " + this.jobIdString() + ", status " + (Object)((Object)this.jobStatus()) + "\n" + dag);
            this.logger.fine("Building execution plan for " + this.jobIdString());
            this.executionPlanMap = ExecutionPlanBuilder.createExecutionPlans((NodeEngine)this.nodeEngine, membersView, dag, this.getJobConfig(), lastSnapshotId);
        }
        catch (Exception e) {
            this.logger.severe("Exception creating execution plan for " + this.jobIdString(), (Throwable)e);
            this.onCompleteStepCompleted(e);
            return;
        }
        finally {
            Thread.currentThread().setContextClassLoader(previousCL);
        }
        this.logger.fine("Built execution plans for " + this.jobIdString());
        Set<MemberInfo> participants = this.executionPlanMap.keySet();
        Function<ExecutionPlan, Operation> operationCtor = plan -> new InitOperation(this.jobId, this.executionId, membersView.getVersion(), participants, (ExecutionPlan)plan);
        this.invoke(operationCtor, this::onInitStepCompleted, null);
    }

    private void rewriteDagWithSnapshotRestore(DAG dag, long snapshotId) {
        this.logger.info(this.jobIdString() + ": restoring state from snapshotId=" + snapshotId);
        for (Vertex vertex : dag) {
            DistributedFunction projection = e -> e.getKey() instanceof BroadcastKey ? new BroadcastEntry(e) : e;
            String mapName = SnapshotRepository.snapshotDataMapName(this.jobId, snapshotId, vertex.getName());
            Vertex readSnapshotVertex = dag.newVertex("__read_snapshot." + vertex.getName(), SourceProcessors.readMapP(mapName, TruePredicate.truePredicate(), projection));
            readSnapshotVertex.localParallelism(vertex.getLocalParallelism());
            int destOrdinal = dag.getInboundEdges(vertex.getName()).size();
            dag.edge(new SnapshotRestoreEdge(readSnapshotVertex, vertex, destOrdinal));
        }
    }

    private boolean setJobStatusToStarting() {
        JobStatus status = this.jobStatus();
        if (status == JobStatus.COMPLETED || status == JobStatus.FAILED) {
            this.logger.severe("Cannot init job " + Util.idToString(this.jobId) + ": it is already " + (Object)((Object)status));
            return false;
        }
        if (this.completionFuture.isCancelled()) {
            this.logger.fine("Skipping init job " + Util.idToString(this.jobId) + ": is already cancelled.");
            this.onCompleteStepCompleted(null);
            return false;
        }
        if (status == JobStatus.NOT_STARTED) {
            if (!this.jobStatus.compareAndSet(JobStatus.NOT_STARTED, JobStatus.STARTING)) {
                this.logger.fine("Cannot init job " + Util.idToString(this.jobId) + ": someone else is just starting it");
                return false;
            }
            this.jobStartTime = System.currentTimeMillis();
        } else {
            this.jobStatus.compareAndSet(JobStatus.RUNNING, JobStatus.RESTARTING);
        }
        status = this.jobStatus();
        if (status != JobStatus.STARTING && status != JobStatus.RESTARTING) {
            this.logger.severe("Cannot init job " + Util.idToString(this.jobId) + ": status is " + (Object)((Object)status));
            return false;
        }
        return true;
    }

    private boolean scheduleRestartIfQuorumAbsent() {
        int quorumSize = this.jobRecord.getQuorumSize();
        if (this.coordinationService.isQuorumPresent(quorumSize)) {
            return false;
        }
        this.logger.fine("Rescheduling restart of job " + Util.idToString(this.jobId) + ": quorum size " + quorumSize + " is not met");
        this.scheduleRestart();
        return true;
    }

    private boolean scheduleRestartIfClusterIsNotSafe() {
        if (this.coordinationService.shouldStartJobs()) {
            return false;
        }
        this.logger.fine("Rescheduling restart of job " + Util.idToString(this.jobId) + ": cluster is not safe");
        this.scheduleRestart();
        return true;
    }

    private void scheduleRestart() {
        this.jobStatus.compareAndSet(JobStatus.RUNNING, JobStatus.RESTARTING);
        this.coordinationService.scheduleRestart(this.jobId);
    }

    private MembersView getMembersView() {
        ClusterServiceImpl clusterService = (ClusterServiceImpl)this.nodeEngine.getClusterService();
        return clusterService.getMembershipManager().getMembersView();
    }

    private DAG deserializeDAG() {
        ClassLoader cl = this.coordinationService.getClassLoader(this.jobId);
        return (DAG)CustomClassLoadedObject.deserializeWithCustomClassLoader(this.nodeEngine.getSerializationService(), cl, this.jobRecord.getDag());
    }

    private void onInitStepCompleted(Map<MemberInfo, Object> responses) {
        JobStatus status;
        Throwable error = this.getInitResult(responses);
        if (error == null && (status = this.jobStatus()) != JobStatus.STARTING && status != JobStatus.RESTARTING) {
            error = new IllegalStateException("Cannot execute " + this.jobIdString() + ": status is " + (Object)((Object)status));
        }
        if (error == null) {
            this.invokeExecute();
        } else {
            this.invokeComplete(error);
        }
    }

    private Throwable getInitResult(Map<MemberInfo, Object> responses) {
        if (this.completionFuture.isCancelled()) {
            this.logger.fine(this.jobIdString() + " to be cancelled after init");
            return new CancellationException();
        }
        Map<Boolean, List<Map.Entry<MemberInfo, Object>>> grouped = this.groupResponses(responses);
        Collection successfulMembers = grouped.get(false).stream().map(Map.Entry::getKey).collect(Collectors.toList());
        if (successfulMembers.size() == this.executionPlanMap.size()) {
            this.logger.fine("Init of " + this.jobIdString() + " is successful.");
            return null;
        }
        List<Map.Entry<MemberInfo, Object>> failures = grouped.get(true);
        this.logger.fine("Init of " + this.jobIdString() + " failed with: " + failures);
        return failures.stream().map(e -> (Throwable)e.getValue()).filter(t -> !ExceptionUtil.isTopologicalFailure(t)).findFirst().map(ExceptionUtil::peel).orElse((Throwable)((Object)new TopologyChangedException()));
    }

    private Map<Boolean, List<Map.Entry<MemberInfo, Object>>> groupResponses(Map<MemberInfo, Object> responses) {
        Map<Boolean, List<Map.Entry<MemberInfo, Object>>> grouped = responses.entrySet().stream().collect(Collectors.partitioningBy(e -> e.getValue() instanceof Throwable));
        grouped.putIfAbsent(true, Collections.emptyList());
        grouped.putIfAbsent(false, Collections.emptyList());
        return grouped;
    }

    private void invokeExecute() {
        this.jobStatus.set(JobStatus.RUNNING);
        this.logger.fine("Executing " + this.jobIdString());
        Function<ExecutionPlan, Operation> operationCtor = plan -> new ExecuteOperation(this.jobId, this.executionId);
        this.invoke(operationCtor, this::onExecuteStepCompleted, this.completionFuture);
        if (this.isSnapshottingEnabled()) {
            this.coordinationService.scheduleSnapshot(this.jobId, this.executionId);
        }
    }

    void beginSnapshot(long executionId) {
        if (this.executionId != executionId) {
            this.logger.warning("Not beginning snapshot since expected execution id " + Util.idToString(this.executionId) + " does not match to " + Util.jobAndExecutionId(this.jobId, executionId));
            return;
        }
        List<String> vertexNames = this.vertices.stream().map(Vertex::getName).collect(Collectors.toList());
        long newSnapshotId = this.snapshotRepository.registerSnapshot(this.jobId, vertexNames);
        this.logger.info(String.format("Starting snapshot %s for %s", newSnapshotId, Util.jobAndExecutionId(this.jobId, executionId)));
        Function<ExecutionPlan, Operation> factory = plan -> new SnapshotOperation(this.jobId, executionId, newSnapshotId);
        this.invoke(factory, responses -> this.onSnapshotCompleted((Map<MemberInfo, Object>)responses, executionId, newSnapshotId), null);
    }

    private void onSnapshotCompleted(Map<MemberInfo, Object> responses, long executionId, long snapshotId) {
        Map<Address, Throwable> errors = responses.entrySet().stream().filter(e -> e.getValue() instanceof Throwable).filter(e -> !(e.getValue() instanceof CancellationException) || !ExceptionUtil.isTopologicalFailure(e.getValue())).collect(Collectors.toMap(e -> ((MemberInfo)e.getKey()).getAddress(), e -> (Throwable)e.getValue()));
        boolean isSuccess = errors.isEmpty();
        if (!isSuccess) {
            this.logger.warning(Util.jobAndExecutionId(this.jobId, executionId) + " snapshot " + snapshotId + " has failures: " + errors);
        }
        this.coordinationService.completeSnapshot(this.jobId, executionId, snapshotId, isSuccess);
    }

    private void onExecuteStepCompleted(Map<MemberInfo, Object> responses) {
        this.invokeComplete(this.getExecuteResult(responses));
    }

    private Throwable getExecuteResult(Map<MemberInfo, Object> responses) {
        if (this.completionFuture.isCancelled()) {
            this.logger.fine(this.jobIdString() + " to be cancelled after execute");
            return new CancellationException();
        }
        Map<Boolean, List<Map.Entry<MemberInfo, Object>>> grouped = this.groupResponses(responses);
        Collection successfulMembers = grouped.get(false).stream().map(Map.Entry::getKey).collect(Collectors.toList());
        if (successfulMembers.size() == this.executionPlanMap.size()) {
            this.logger.fine("Execute of " + this.jobIdString() + " is successful.");
            return null;
        }
        List<Map.Entry<MemberInfo, Object>> failures = grouped.get(true);
        this.logger.fine("Execute of " + this.jobIdString() + " has failures: " + failures);
        return failures.stream().map(e -> (Throwable)e.getValue()).filter(t -> !(t instanceof CancellationException) && !ExceptionUtil.isTopologicalFailure(t)).findFirst().map(ExceptionUtil::peel).orElse((Throwable)((Object)new TopologyChangedException()));
    }

    private void invokeComplete(Throwable error) {
        Throwable finalError;
        JobStatus status = this.jobStatus();
        if (status == JobStatus.STARTING || status == JobStatus.RESTARTING || status == JobStatus.RUNNING) {
            this.logger.fine("Completing " + this.jobIdString());
            finalError = error;
        } else {
            if (error != null) {
                this.logger.severe("Cannot properly complete failed " + this.jobIdString() + ": status is " + (Object)((Object)status), error);
            } else {
                this.logger.severe("Cannot properly complete " + this.jobIdString() + ": status is " + (Object)((Object)status));
            }
            finalError = new IllegalStateException("Job coordination failed.");
        }
        Function<ExecutionPlan, Operation> operationCtor = plan -> new CompleteOperation(this.executionId, finalError);
        this.invoke(operationCtor, responses -> this.onCompleteStepCompleted(error), null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onCompleteStepCompleted(@Nullable Throwable failure) {
        if (this.assertJobNotAlreadyDone(failure)) {
            return;
        }
        this.completeVertices(failure);
        long completionTime = System.currentTimeMillis();
        if (failure instanceof TopologyChangedException && this.jobRecord.getConfig().isAutoRestartOnMemberFailureEnabled()) {
            this.scheduleRestart();
            return;
        }
        long elapsed = completionTime - this.jobStartTime;
        if (this.isSuccess(failure)) {
            this.logger.info("Execution of " + this.jobIdString() + " completed in " + elapsed + " ms");
        } else {
            this.logger.warning("Execution of " + this.jobIdString() + " failed in " + elapsed + " ms", failure);
        }
        try {
            this.coordinationService.completeJob(this, this.executionId, completionTime, failure);
        }
        catch (RuntimeException e) {
            this.logger.warning("Completion of " + this.jobIdString() + " failed in " + elapsed + " ms", failure);
        }
        finally {
            this.setFinalResult(failure);
        }
    }

    private boolean assertJobNotAlreadyDone(@Nullable Throwable failure) {
        JobStatus status = this.jobStatus();
        if (status == JobStatus.COMPLETED || status == JobStatus.FAILED) {
            if (failure != null) {
                this.logger.severe("Ignoring failure completion of " + Util.idToString(this.jobId) + " because status is " + (Object)((Object)status), failure);
            } else {
                this.logger.severe("Ignoring completion of " + Util.idToString(this.jobId) + " because status is " + (Object)((Object)status));
            }
            return true;
        }
        return false;
    }

    private void completeVertices(@Nullable Throwable failure) {
        for (Vertex vertex : this.vertices) {
            try {
                vertex.getMetaSupplier().complete(failure);
            }
            catch (Exception e) {
                this.logger.severe(this.jobIdString() + " encountered an exception in ProcessorMetaSupplier.complete(), ignoring it", (Throwable)e);
            }
        }
    }

    void setFinalResult(Throwable failure) {
        JobStatus status = this.isSuccess(failure) ? JobStatus.COMPLETED : JobStatus.FAILED;
        this.jobStatus.set(status);
        if (status == JobStatus.COMPLETED) {
            this.completionFuture.complete(true);
        } else {
            this.completionFuture.completeExceptionally(failure);
        }
    }

    private boolean isSuccess(Throwable failure) {
        return failure == null || failure instanceof CancellationException;
    }

    private void invoke(Function<ExecutionPlan, Operation> operationCtor, Consumer<Map<MemberInfo, Object>> completionCallback, CompletableFuture cancellationFuture) {
        boolean cancelOnFailure;
        CompletableFuture<Void> doneFuture = new CompletableFuture<Void>();
        final ConcurrentHashMap<MemberInfo, InternalCompletableFuture<Object>> futures = new ConcurrentHashMap<MemberInfo, InternalCompletableFuture<Object>>();
        this.invokeOnParticipants(futures, doneFuture, operationCtor);
        doneFuture.whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (aVoid, throwable) -> {
            HashMap responses = new HashMap();
            for (Map.Entry entry : futures.entrySet()) {
                Object val;
                try {
                    val = ((InternalCompletableFuture)entry.getValue()).get();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    val = e;
                }
                catch (Exception e) {
                    val = ExceptionUtil.peel(e);
                }
                responses.put(entry.getKey(), val);
            }
            completionCallback.accept(responses);
        }));
        boolean bl = cancelOnFailure = cancellationFuture != null;
        if (cancelOnFailure) {
            cancellationFuture.whenComplete(ExceptionUtil.withTryCatch(this.logger, (r, e) -> {
                if (e instanceof CancellationException) {
                    futures.values().forEach(f -> f.cancel(true));
                }
            }));
            ExecutionCallback<Object> callback = new ExecutionCallback<Object>(){

                public void onResponse(Object response) {
                }

                public void onFailure(Throwable t) {
                    futures.values().forEach(f -> f.cancel(true));
                }
            };
            futures.values().forEach(arg_0 -> MasterContext.lambda$invoke$18((ExecutionCallback)callback, arg_0));
        }
    }

    private void invokeOnParticipants(Map<MemberInfo, InternalCompletableFuture<Object>> futures, CompletableFuture<Void> doneFuture, Function<ExecutionPlan, Operation> opCtor) {
        AtomicInteger remainingCount = new AtomicInteger(this.executionPlanMap.size());
        for (Map.Entry<MemberInfo, ExecutionPlan> e : this.executionPlanMap.entrySet()) {
            MemberInfo member = e.getKey();
            Operation op = opCtor.apply(e.getValue());
            InternalCompletableFuture future = this.nodeEngine.getOperationService().createInvocationBuilder("hz:impl:jetService", op, member.getAddress()).setDoneCallback(() -> {
                if (remainingCount.decrementAndGet() == 0) {
                    doneFuture.complete(null);
                }
            }).invoke();
            futures.put(member, (InternalCompletableFuture<Object>)future);
        }
    }

    private boolean isSnapshottingEnabled() {
        return this.getJobConfig().getProcessingGuarantee() != ProcessingGuarantee.NONE;
    }

    private String jobIdString() {
        return Util.jobAndExecutionId(this.jobId, this.executionId);
    }

    private static ClassLoader swapContextClassLoader(ClassLoader jobClassLoader) {
        Thread currentThread = Thread.currentThread();
        ClassLoader contextClassLoader = currentThread.getContextClassLoader();
        currentThread.setContextClassLoader(jobClassLoader);
        return contextClassLoader;
    }

    private static /* synthetic */ void lambda$invoke$18(ExecutionCallback callback, InternalCompletableFuture f) {
        f.andThen(callback);
    }

    private static class SnapshotRestoreEdge
    extends Edge {
        SnapshotRestoreEdge(Vertex source, Vertex destination, int destOrdinal) {
            super(source, 0, destination, destOrdinal);
            this.distributed();
            this.partitioned(DistributedFunctions.entryKey());
        }

        @Override
        public int getPriority() {
            return Integer.MIN_VALUE;
        }
    }
}

