/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.JetService;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.JobTerminateRequestedException;
import com.hazelcast.jet.impl.exception.TerminatedWithSnapshotException;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.SenderTasklet;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.execution.TaskletExecutionService;
import com.hazelcast.jet.impl.execution.init.ExecutionPlan;
import com.hazelcast.jet.impl.metrics.RawJobMetrics;
import com.hazelcast.jet.impl.operation.SnapshotPhase1Operation;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;

public class ExecutionContext
implements DynamicMetricsProvider {
    private final long jobId;
    private final long executionId;
    private final Address coordinator;
    private final Set<Address> participants;
    private final Object executionLock = new Object();
    private final ILogger logger;
    private final Counter startTime = MwCounter.newMwCounter((long)-1L);
    private final Counter completionTime = MwCounter.newMwCounter((long)-1L);
    private final ConcurrentHashMap<String, File> tempDirectories = new ConcurrentHashMap();
    private String jobName;
    private Map<Integer, Map<Integer, Map<Address, ReceiverTasklet>>> receiverMap = Collections.emptyMap();
    private Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> senderMap = Collections.emptyMap();
    private List<ProcessorSupplier> procSuppliers = Collections.emptyList();
    private List<Tasklet> tasklets = Collections.emptyList();
    private volatile CompletableFuture<Void> executionFuture;
    private final CompletableFuture<Void> cancellationFuture = new CompletableFuture();
    private final NodeEngine nodeEngine;
    private final TaskletExecutionService taskletExecService;
    private SnapshotContext snapshotContext;
    private JobConfig jobConfig;
    private boolean metricsEnabled;
    private volatile RawJobMetrics jobMetrics = RawJobMetrics.empty();
    private InternalSerializationService serializationService;

    public ExecutionContext(NodeEngine nodeEngine, TaskletExecutionService taskletExecService, long jobId, long executionId, Address coordinator, Set<Address> participants) {
        this.jobId = jobId;
        this.executionId = executionId;
        this.coordinator = coordinator;
        this.participants = participants;
        this.taskletExecService = taskletExecService;
        this.nodeEngine = nodeEngine;
        this.jobName = Util.idToString(jobId);
        this.logger = nodeEngine.getLogger(this.getClass());
    }

    public ExecutionContext initialize(ExecutionPlan plan) {
        this.jobConfig = plan.getJobConfig();
        this.jobName = this.jobConfig.getName() == null ? this.jobName : this.jobConfig.getName();
        this.procSuppliers = Collections.unmodifiableList(plan.getProcessorSuppliers());
        this.snapshotContext = new SnapshotContext(this.nodeEngine.getLogger(SnapshotContext.class), this.jobNameAndExecutionId(), plan.lastSnapshotId(), this.jobConfig.getProcessingGuarantee());
        JetService jetService = (JetService)this.nodeEngine.getService("hz:impl:jetService");
        this.serializationService = jetService.createSerializationService(this.jobConfig.getSerializerConfigs());
        this.metricsEnabled = this.jobConfig.isMetricsEnabled() && this.nodeEngine.getConfig().getMetricsConfig().isEnabled();
        plan.initialize(this.nodeEngine, this.jobId, this.executionId, this.snapshotContext, this.tempDirectories, this.serializationService);
        this.snapshotContext.initTaskletCount(plan.getProcessorTaskletCount(), plan.getStoreSnapshotTaskletCount(), plan.getHigherPriorityVertexCount());
        this.receiverMap = Collections.unmodifiableMap(plan.getReceiverMap());
        this.senderMap = Collections.unmodifiableMap(plan.getSenderMap());
        this.tasklets = plan.getTasklets();
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> beginExecution() {
        Object object = this.executionLock;
        synchronized (object) {
            if (this.executionFuture != null) {
                return this.executionFuture;
            }
            JetService service = (JetService)this.nodeEngine.getService("hz:impl:jetService");
            ClassLoader cl = service.getJobExecutionService().getClassLoader(this.jobConfig, this.jobId);
            this.executionFuture = this.taskletExecService.beginExecute(this.tasklets, this.cancellationFuture, cl).thenApply(res -> {
                if (this.snapshotContext.isTerminalSnapshot()) {
                    throw new TerminatedWithSnapshotException();
                }
                return res;
            });
            this.startTime.set(System.currentTimeMillis());
            return this.executionFuture;
        }
    }

    public void completeExecution(Throwable error) {
        assert (this.executionFuture == null || this.executionFuture.isDone()) : "If execution was begun, then completeExecution() should not be called before execution is done.";
        for (Tasklet tasklet : this.tasklets) {
            try {
                tasklet.close();
            }
            catch (Throwable e) {
                this.logger.severe(this.jobNameAndExecutionId() + " encountered an exception in Processor.close(), ignoring it", e);
            }
        }
        for (ProcessorSupplier s : this.procSuppliers) {
            try {
                s.close(error);
            }
            catch (Throwable e) {
                this.logger.severe(this.jobNameAndExecutionId() + " encountered an exception in ProcessorSupplier.close(), ignoring it", e);
            }
        }
        this.tempDirectories.forEach((k, dir) -> {
            try {
                IOUtil.delete((File)dir);
            }
            catch (Exception e) {
                this.logger.warning("Failed to delete temporary directory " + dir);
            }
        });
        if (this.serializationService != null) {
            this.serializationService.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> terminateExecution(@Nullable TerminationMode mode) {
        assert (mode == null || !mode.isWithTerminalSnapshot()) : "terminating with a mode that should do a terminal snapshot";
        Object object = this.executionLock;
        synchronized (object) {
            if (mode == null) {
                this.cancellationFuture.cancel(true);
            } else {
                this.cancellationFuture.completeExceptionally((Throwable)((Object)new JobTerminateRequestedException(mode)));
            }
            if (this.executionFuture == null) {
                this.executionFuture = this.cancellationFuture;
            }
            this.snapshotContext.cancel();
            return this.executionFuture;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<SnapshotPhase1Operation.SnapshotPhase1Result> beginSnapshotPhase1(long snapshotId, String mapName, int flags) {
        LoggingUtil.logFine(this.logger, "Starting snapshot %d phase 1 for %s on member", snapshotId, this.jobNameAndExecutionId());
        Object object = this.executionLock;
        synchronized (object) {
            if (this.cancellationFuture.isDone()) {
                throw new CancellationException();
            }
            if (this.executionFuture != null && this.executionFuture.isDone()) {
                LoggingUtil.logFine(this.logger, "Ignoring snapshot %d phase 1 for %s: execution completed", snapshotId, this.jobNameAndExecutionId());
                return CompletableFuture.completedFuture(new SnapshotPhase1Operation.SnapshotPhase1Result(0L, 0L, 0L, null));
            }
            return this.snapshotContext.startNewSnapshotPhase1(snapshotId, mapName, flags);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> beginSnapshotPhase2(long snapshotId, boolean success) {
        LoggingUtil.logFine(this.logger, "Starting snapshot %d phase 2 for %s on member", snapshotId, this.jobNameAndExecutionId());
        Object object = this.executionLock;
        synchronized (object) {
            if (this.cancellationFuture.isDone()) {
                throw new CancellationException();
            }
            if (this.executionFuture != null && this.executionFuture.isDone()) {
                LoggingUtil.logFine(this.logger, "Ignoring snapshot %d phase 2 for %s: execution completed", snapshotId, this.jobNameAndExecutionId());
                return CompletableFuture.completedFuture(null);
            }
            return this.snapshotContext.startNewSnapshotPhase2(snapshotId, success);
        }
    }

    public void handlePacket(int vertexId, int ordinal, Address sender, byte[] payload, int offset) {
        this.receiverMap.get(vertexId).get(ordinal).get(sender).receiveStreamPacket(payload, offset);
    }

    public boolean hasParticipant(Address member) {
        return this.participants.contains(member);
    }

    public long jobId() {
        return this.jobId;
    }

    public long executionId() {
        return this.executionId;
    }

    public String jobNameAndExecutionId() {
        return com.hazelcast.jet.impl.util.Util.jobNameAndExecutionId(this.jobName, this.executionId);
    }

    public Address coordinator() {
        return this.coordinator;
    }

    public Map<Integer, Map<Integer, Map<Address, SenderTasklet>>> senderMap() {
        return this.senderMap;
    }

    public Map<Integer, Map<Integer, Map<Address, ReceiverTasklet>>> receiverMap() {
        return this.receiverMap;
    }

    @Nullable
    public String jobName() {
        return this.jobName;
    }

    public RawJobMetrics getJobMetrics() {
        return this.jobMetrics;
    }

    public void setJobMetrics(RawJobMetrics jobMetrics) {
        this.jobMetrics = jobMetrics;
    }

    public void provideDynamicMetrics(MetricDescriptor descriptor, MetricsCollectionContext context) {
        if (!this.metricsEnabled) {
            return;
        }
        descriptor = descriptor.withTag("job", Util.idToString(this.jobId)).withTag("exec", Util.idToString(this.executionId));
        context.collect(descriptor, "executionStartTime", ProbeLevel.INFO, ProbeUnit.MS, this.startTime.get());
        context.collect(descriptor, "executionCompletionTime", ProbeLevel.INFO, ProbeUnit.MS, this.completionTime.get());
        for (Tasklet tasklet : this.tasklets) {
            tasklet.provideDynamicMetrics(descriptor.copy(), context);
        }
    }

    public void setCompletionTime() {
        this.completionTime.set(System.currentTimeMillis());
    }
}

