/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.metrics.ProbeBuilder;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.OutboundEdgeStream;
import com.hazelcast.jet.impl.execution.OutboxImpl;
import com.hazelcast.jet.impl.execution.ProcessorState;
import com.hazelcast.jet.impl.execution.SnapshotBarrier;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import com.hazelcast.jet.impl.processor.ProcessorWrapper;
import com.hazelcast.jet.impl.util.ArrayDequeInbox;
import com.hazelcast.jet.impl.util.CircularListCursor;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.Preconditions;
import com.hazelcast.util.function.Predicate;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public class ProcessorTasklet
implements Tasklet {
    private static final int OUTBOX_BATCH_SIZE = 2048;
    private final ProgressTracker progTracker = new ProgressTracker();
    private final OutboundEdgeStream[] outstreams;
    private final OutboxImpl outbox;
    private final Processor.Context context;
    private final Processor processor;
    private final SnapshotContext ssContext;
    private final BitSet receivedBarriers;
    private final ArrayDequeInbox inbox = new ArrayDequeInbox(this.progTracker);
    private final Queue<ArrayList<InboundEdgeStream>> instreamGroupQueue;
    private final WatermarkCoalescer watermarkCoalescer;
    private final ILogger logger;
    private final SerializationService serializationService;
    private int numActiveOrdinals;
    private CircularListCursor<InboundEdgeStream> instreamCursor;
    private InboundEdgeStream currInstream;
    private ProcessorState state;
    private long pendingSnapshotId;
    private SnapshotBarrier currentBarrier;
    private Watermark pendingWatermark;
    private boolean processorClosed;
    private boolean waitForAllBarriers;
    private final AtomicLongArray receivedCounts;
    private final AtomicLongArray receivedBatches;
    private final AtomicLongArray emittedCounts;
    private final AtomicLong queuesSize = new AtomicLong();
    private final AtomicLong queuesCapacity = new AtomicLong();
    private final Predicate<Object> addToInboxFunction = this.inbox.queue()::add;

    public ProcessorTasklet(@Nonnull Processor.Context context, @Nonnull SerializationService serializationService, @Nonnull Processor processor, @Nonnull List<? extends InboundEdgeStream> instreams, @Nonnull List<? extends OutboundEdgeStream> outstreams, @Nonnull SnapshotContext ssContext, @Nonnull OutboundCollector ssCollector, @Nullable ProbeBuilder probeBuilder) {
        Preconditions.checkNotNull((Object)processor, (String)"processor");
        this.context = context;
        this.serializationService = serializationService;
        this.processor = processor;
        this.numActiveOrdinals = instreams.size();
        this.instreamGroupQueue = new ArrayDeque(instreams.stream().collect(Collectors.groupingBy(InboundEdgeStream::priority, TreeMap::new, Collectors.toCollection(ArrayList::new))).values());
        this.outstreams = (OutboundEdgeStream[])outstreams.stream().sorted(Comparator.comparing(OutboundEdgeStream::ordinal)).toArray(OutboundEdgeStream[]::new);
        this.ssContext = ssContext;
        this.logger = this.getLogger(context);
        this.instreamCursor = this.popInstreamGroup();
        this.receivedCounts = new AtomicLongArray(instreams.size());
        this.receivedBatches = new AtomicLongArray(instreams.size());
        this.emittedCounts = new AtomicLongArray(outstreams.size() + 1);
        this.outbox = this.createOutbox(ssCollector);
        this.receivedBarriers = new BitSet(instreams.size());
        this.state = this.initialProcessingState();
        this.pendingSnapshotId = ssContext.activeSnapshotId() + 1L;
        this.waitForAllBarriers = ssContext.processingGuarantee() == ProcessingGuarantee.EXACTLY_ONCE;
        this.watermarkCoalescer = WatermarkCoalescer.create(instreams.size());
        if (probeBuilder != null) {
            this.registerMetrics(instreams, probeBuilder);
        }
    }

    @SuppressFBWarnings(value={"RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE"}, justification="jetInstance() can be null in TestProcessorContext")
    private ILogger getLogger(@Nonnull Processor.Context context) {
        return context.jetInstance() != null ? context.jetInstance().getHazelcastInstance().getLoggingService().getLogger(this.getClass() + "." + this.toString()) : Logger.getLogger(this.getClass());
    }

    private void registerMetrics(List<? extends InboundEdgeStream> instreams, ProbeBuilder probeBuilder) {
        int finalI;
        int i;
        for (i = 0; i < instreams.size(); ++i) {
            finalI = i;
            ProbeBuilder builderWithOrdinal = probeBuilder.withTag("ordinal", String.valueOf(i));
            builderWithOrdinal.register((Object)this, "receivedCount", ProbeLevel.INFO, ProbeUnit.COUNT, t -> t.receivedCounts.get(finalI));
            builderWithOrdinal.register((Object)this, "receivedBatches", ProbeLevel.INFO, ProbeUnit.COUNT, t -> t.receivedBatches.get(finalI));
            InboundEdgeStream instream = instreams.get(finalI);
            builderWithOrdinal.register((Object)this, "topObservedWm", ProbeLevel.INFO, ProbeUnit.MS, t -> instream.topObservedWm());
            builderWithOrdinal.register((Object)this, "coalescedWm", ProbeLevel.INFO, ProbeUnit.MS, t -> instream.coalescedWm());
        }
        for (i = 0; i < this.emittedCounts.length() - (this.context.snapshottingEnabled() ? 0 : 1); ++i) {
            finalI = i;
            probeBuilder.withTag("ordinal", i == this.emittedCounts.length() - 1 ? "snapshot" : String.valueOf(i)).register((Object)this, "emittedCount", ProbeLevel.INFO, ProbeUnit.COUNT, t -> t.emittedCounts.get(finalI));
        }
        probeBuilder.register((Object)this, "topObservedWm", ProbeLevel.INFO, ProbeUnit.MS, t -> t.watermarkCoalescer.topObservedWm());
        probeBuilder.register((Object)this, "coalescedWm", ProbeLevel.INFO, ProbeUnit.MS, t -> t.watermarkCoalescer.coalescedWm());
        probeBuilder.register((Object)this, "lastForwardedWm", ProbeLevel.INFO, ProbeUnit.MS, t -> t.outbox.lastForwardedWm());
        probeBuilder.register((Object)this, "lastForwardedWmLatency", ProbeLevel.INFO, ProbeUnit.MS, t -> this.lastForwardedWmLatency());
        probeBuilder.register((Object)this, "queuesSize", ProbeLevel.INFO, ProbeUnit.COUNT, t -> t.queuesSize.get());
        probeBuilder.register((Object)this, "queuesCapacity", ProbeLevel.INFO, ProbeUnit.COUNT, t -> t.queuesCapacity.get());
    }

    private OutboxImpl createOutbox(@Nonnull OutboundCollector ssCollector) {
        OutboundCollector[] collectors = new OutboundCollector[this.outstreams.length + 1];
        for (int i = 0; i < this.outstreams.length; ++i) {
            collectors[i] = this.outstreams[i].getCollector();
        }
        collectors[this.outstreams.length] = ssCollector;
        return new OutboxImpl(collectors, true, this.progTracker, this.serializationService, 2048, this.emittedCounts);
    }

    @Override
    public void init() {
        if (this.serializationService.getManagedContext() != null) {
            Processor toInit = this.processor instanceof ProcessorWrapper ? ((ProcessorWrapper)this.processor).getWrapped() : this.processor;
            Object initialized = this.serializationService.getManagedContext().initialize((Object)toInit);
            assert (initialized == toInit) : "different object returned";
        }
        try {
            this.processor.init(this.outbox, this.context);
        }
        catch (Exception e) {
            throw ExceptionUtil.sneakyThrow(e);
        }
    }

    @Override
    @Nonnull
    public ProgressState call() {
        assert (!this.processorClosed) : "processor closed";
        this.progTracker.reset();
        this.outbox.reset();
        this.stateMachineStep();
        ProgressState progressState = this.progTracker.toProgressState();
        if (progressState.isDone()) {
            this.closeProcessor();
            this.processorClosed = true;
        }
        return progressState;
    }

    private void closeProcessor() {
        assert (!this.processorClosed) : "processor already closed";
        try {
            this.processor.close();
        }
        catch (Exception e) {
            this.logger.severe(Util.jobNameAndExecutionId(this.context.jobConfig().getName(), this.context.executionId()) + " encountered an exception in Processor.close(), ignoring it", (Throwable)e);
        }
    }

    private void stateMachineStep() {
        switch (this.state) {
            case PROCESS_WATERMARK: {
                this.progTracker.notDone();
                if (this.pendingWatermark == null) {
                    long wm = this.watermarkCoalescer.checkWmHistory();
                    if (wm == Long.MIN_VALUE) {
                        this.state = ProcessorState.PROCESS_INBOX;
                        this.stateMachineStep();
                        break;
                    }
                    this.pendingWatermark = new Watermark(wm);
                }
                if (!(this.pendingWatermark.equals(WatermarkCoalescer.IDLE_MESSAGE) ? this.outbox.offer(WatermarkCoalescer.IDLE_MESSAGE) : this.processor.tryProcessWatermark(this.pendingWatermark))) break;
                this.state = ProcessorState.PROCESS_INBOX;
                this.pendingWatermark = null;
                this.stateMachineStep();
                break;
            }
            case PROCESS_INBOX: {
                this.progTracker.notDone();
                if (this.inbox.isEmpty()) {
                    if (this.isSnapshotInbox() || this.processor.tryProcess()) {
                        assert (!this.outbox.hasUnfinishedItem()) : this.isSnapshotInbox() ? "Unfinished item before fillInbox call" : "Processor.tryProcess() returned true, but there's unfinished item in the outbox";
                        this.fillInbox();
                    } else {
                        return;
                    }
                }
                if (!this.inbox.isEmpty()) {
                    if (this.isSnapshotInbox()) {
                        this.processor.restoreFromSnapshot(this.inbox);
                    } else {
                        this.processor.process(this.currInstream.ordinal(), this.inbox);
                    }
                }
                if (this.inbox.isEmpty()) {
                    if (this.currInstream != null && this.currInstream.isDone()) {
                        this.state = ProcessorState.COMPLETE_EDGE;
                        this.progTracker.madeProgress();
                        return;
                    }
                    if (this.numActiveOrdinals > 0 && this.receivedBarriers.cardinality() == this.numActiveOrdinals) {
                        this.state = ProcessorState.SAVE_SNAPSHOT;
                        return;
                    }
                    if (this.numActiveOrdinals == 0) {
                        this.progTracker.madeProgress();
                        this.state = ProcessorState.COMPLETE;
                    } else {
                        this.state = ProcessorState.PROCESS_WATERMARK;
                    }
                }
                return;
            }
            case COMPLETE_EDGE: {
                this.progTracker.notDone();
                if (this.isSnapshotInbox() ? this.processor.finishSnapshotRestore() : this.processor.completeEdge(this.currInstream.ordinal())) {
                    assert (!this.outbox.hasUnfinishedItem()) : "outbox has unfinished item after successful completeEdge() or finishSnapshotRestore()";
                    this.progTracker.madeProgress();
                    this.state = this.initialProcessingState();
                }
                return;
            }
            case SAVE_SNAPSHOT: {
                this.progTracker.notDone();
                if (this.processor.saveToSnapshot()) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.EMIT_BARRIER;
                }
                return;
            }
            case EMIT_BARRIER: {
                assert (this.currentBarrier != null) : "currentBarrier == null";
                if (this.outbox.offerToEdgesAndSnapshot(this.currentBarrier)) {
                    this.progTracker.madeProgress();
                    if (this.currentBarrier.isTerminal()) {
                        this.state = ProcessorState.EMIT_DONE_ITEM;
                    } else {
                        this.currentBarrier = null;
                        this.receivedBarriers.clear();
                        ++this.pendingSnapshotId;
                        this.state = this.initialProcessingState();
                    }
                }
                this.progTracker.notDone();
                return;
            }
            case COMPLETE: {
                this.progTracker.notDone();
                long currSnapshotId = this.ssContext.activeSnapshotId();
                assert (currSnapshotId <= this.pendingSnapshotId) : "Unexpected new snapshot id " + currSnapshotId + ", current was" + this.pendingSnapshotId;
                if (currSnapshotId == this.pendingSnapshotId) {
                    if (this.outbox.hasUnfinishedItem()) {
                        this.outbox.block();
                    } else {
                        this.outbox.unblock();
                        this.state = ProcessorState.SAVE_SNAPSHOT;
                        this.currentBarrier = new SnapshotBarrier(currSnapshotId, this.ssContext.isTerminalSnapshot());
                        this.progTracker.madeProgress();
                        return;
                    }
                }
                if (this.processor.complete()) {
                    this.progTracker.madeProgress();
                    this.state = ProcessorState.EMIT_DONE_ITEM;
                }
                return;
            }
            case EMIT_DONE_ITEM: {
                if (!this.outbox.offerToEdgesAndSnapshot(DoneItem.DONE_ITEM)) {
                    this.progTracker.notDone();
                    return;
                }
                this.state = ProcessorState.END;
                return;
            }
            default: {
                throw new JetException("Unexpected state: " + (Object)((Object)this.state));
            }
        }
    }

    private void fillInbox() {
        ProgressState result;
        assert (this.inbox.isEmpty()) : "inbox is not empty";
        assert (this.pendingWatermark == null) : "null wm expected, but was " + this.pendingWatermark;
        if (this.instreamCursor == null) {
            return;
        }
        InboundEdgeStream first = this.instreamCursor.value();
        do {
            this.currInstream = this.instreamCursor.value();
            result = ProgressState.NO_PROGRESS;
            if (this.waitForAllBarriers && this.receivedBarriers.get(this.currInstream.ordinal())) {
                this.instreamCursor.advance();
                continue;
            }
            result = this.currInstream.drainTo(this.addToInboxFunction);
            this.progTracker.madeProgress(result.isMadeProgress());
            Object lastItem = this.inbox.queue().peekLast();
            if (lastItem instanceof Watermark) {
                long newWmValue = ((Watermark)this.inbox.queue().removeLast()).timestamp();
                long wm = this.watermarkCoalescer.observeWm(this.currInstream.ordinal(), newWmValue);
                if (wm != Long.MIN_VALUE) {
                    this.pendingWatermark = new Watermark(wm);
                }
            } else if (lastItem instanceof SnapshotBarrier) {
                SnapshotBarrier barrier = (SnapshotBarrier)this.inbox.queue().removeLast();
                this.observeBarrier(this.currInstream.ordinal(), barrier);
            } else if (lastItem != null && !(lastItem instanceof BroadcastItem)) {
                this.watermarkCoalescer.observeEvent(this.currInstream.ordinal());
            }
            if (result.isDone()) {
                this.receivedBarriers.clear(this.currInstream.ordinal());
                long wm = this.watermarkCoalescer.queueDone(this.currInstream.ordinal());
                if (wm != Long.MIN_VALUE) {
                    assert (this.pendingWatermark == null || this.pendingWatermark.timestamp() < wm) : "trying to assign lower WM. Old=" + this.pendingWatermark.timestamp() + ", new=" + wm;
                    this.pendingWatermark = new Watermark(wm);
                }
                this.instreamCursor.remove();
                --this.numActiveOrdinals;
            }
            if (this.instreamCursor.advance()) continue;
            this.instreamCursor = this.popInstreamGroup();
            break;
        } while (!result.isMadeProgress() && this.instreamCursor.value() != first);
        Util.lazyAdd(this.receivedCounts, this.currInstream.ordinal(), this.inbox.size());
        if (!this.inbox.isEmpty()) {
            Util.lazyIncrement(this.receivedBatches, this.currInstream.ordinal());
        }
        this.queuesCapacity.lazySet(this.instreamCursor == null ? 0L : (long)Util.sum(this.instreamCursor.getList(), InboundEdgeStream::capacities));
        this.queuesSize.lazySet(this.instreamCursor == null ? 0L : (long)Util.sum(this.instreamCursor.getList(), InboundEdgeStream::sizes));
    }

    private CircularListCursor<InboundEdgeStream> popInstreamGroup() {
        return Optional.ofNullable(this.instreamGroupQueue.poll()).map(CircularListCursor::new).orElse(null);
    }

    public String toString() {
        String jobPrefix = this.context.jobConfig().getName() == null ? "" : this.context.jobConfig().getName() + "/";
        return "ProcessorTasklet{" + jobPrefix + this.context.vertexName() + '#' + this.context.globalProcessorIndex() + '}';
    }

    private void observeBarrier(int ordinal, SnapshotBarrier barrier) {
        if (barrier.snapshotId() != this.pendingSnapshotId) {
            throw new JetException("Unexpected snapshot barrier ID " + barrier.snapshotId() + " from ordinal " + ordinal + " expected " + this.pendingSnapshotId);
        }
        this.currentBarrier = barrier;
        if (barrier.isTerminal()) {
            this.waitForAllBarriers = true;
        }
        this.receivedBarriers.set(ordinal);
    }

    private ProcessorState initialProcessingState() {
        return this.pendingWatermark != null ? ProcessorState.PROCESS_WATERMARK : (this.instreamCursor == null ? ProcessorState.COMPLETE : ProcessorState.PROCESS_INBOX);
    }

    private boolean isSnapshotInbox() {
        return this.currInstream != null && this.currInstream.priority() == Integer.MIN_VALUE;
    }

    private long lastForwardedWmLatency() {
        long wm = this.outbox.lastForwardedWm();
        if (wm == WatermarkCoalescer.IDLE_MESSAGE.timestamp()) {
            return Long.MIN_VALUE;
        }
        if (wm == Long.MIN_VALUE) {
            return Long.MAX_VALUE;
        }
        return System.currentTimeMillis() - wm;
    }

    @Override
    public boolean isCooperative() {
        return this.processor.isCooperative();
    }

    @Override
    public void close() {
        if (!this.processorClosed) {
            this.closeProcessor();
            this.processorClosed = true;
        }
    }
}

