/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.JetException;
import com.hazelcast.jet.impl.SnapshotRepository;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.SnapshotBarrier;
import com.hazelcast.jet.impl.execution.SnapshotContext;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.AsyncMapWriter;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.NodeEngine;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;

public class StoreSnapshotTasklet
implements Tasklet {
    long pendingSnapshotId;
    private final ProgressTracker progTracker = new ProgressTracker();
    private final long jobId;
    private final InboundEdgeStream inboundEdgeStream;
    private final SnapshotContext snapshotContext;
    private final AsyncMapWriter mapWriter;
    private final boolean isHigherPrioritySource;
    private final String vertexName;
    private final ILogger logger;
    private final AtomicInteger numActiveFlushes = new AtomicInteger();
    private State state = State.DRAIN;
    private boolean hasReachedBarrier;
    private boolean inputIsDone;

    public StoreSnapshotTasklet(SnapshotContext snapshotContext, long jobId, InboundEdgeStream inboundEdgeStream, NodeEngine nodeEngine, String vertexName, boolean isHigherPrioritySource) {
        this.snapshotContext = snapshotContext;
        this.jobId = jobId;
        this.inboundEdgeStream = inboundEdgeStream;
        this.vertexName = vertexName;
        this.isHigherPrioritySource = isHigherPrioritySource;
        this.mapWriter = new AsyncMapWriter(nodeEngine);
        this.pendingSnapshotId = snapshotContext.lastSnapshotId() + 1L;
        this.mapWriter.setMapName(this.currMapName());
        this.logger = nodeEngine.getLogger(StoreSnapshotTasklet.class + "." + vertexName + "#snapshot");
    }

    @Override
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        this.stateMachineStep();
        return this.progTracker.toProgressState();
    }

    private void stateMachineStep() {
        switch (this.state) {
            case DRAIN: {
                this.progTracker.notDone();
                ProgressState result = this.inboundEdgeStream.drainTo(o -> {
                    if (o instanceof SnapshotBarrier) {
                        SnapshotBarrier barrier = (SnapshotBarrier)o;
                        assert (this.pendingSnapshotId == barrier.snapshotId()) : "Unexpected barrier, expected was " + this.pendingSnapshotId + ", but barrier was " + barrier.snapshotId() + ", this=" + this;
                        this.hasReachedBarrier = true;
                    } else {
                        this.mapWriter.put((Map.Entry)o);
                    }
                });
                if (result.isDone()) {
                    this.inputIsDone = true;
                }
                if (result.isMadeProgress()) {
                    this.progTracker.madeProgress();
                    this.state = State.FLUSH;
                    this.stateMachineStep();
                }
                return;
            }
            case FLUSH: {
                this.progTracker.notDone();
                CompletableFuture<Void> future = new CompletableFuture<Void>();
                future.whenComplete((BiConsumer)ExceptionUtil.withTryCatch(this.logger, (r, t) -> {
                    if (t != null) {
                        this.logger.severe("Error writing to snapshot map '" + this.currMapName() + "'", t);
                        this.snapshotContext.reportError((Throwable)t);
                    }
                    this.numActiveFlushes.decrementAndGet();
                }));
                if (this.mapWriter.tryFlushAsync(future)) {
                    this.progTracker.madeProgress();
                    this.numActiveFlushes.incrementAndGet();
                    this.state = this.inputIsDone ? State.DONE : (this.hasReachedBarrier ? State.REACHED_BARRIER : State.DRAIN);
                }
                return;
            }
            case REACHED_BARRIER: {
                this.progTracker.notDone();
                if (this.numActiveFlushes.get() == 0) {
                    this.snapshotContext.snapshotDoneForTasklet();
                    ++this.pendingSnapshotId;
                    this.mapWriter.setMapName(this.currMapName());
                    this.state = this.inputIsDone ? State.DONE : State.DRAIN;
                    this.hasReachedBarrier = false;
                }
                return;
            }
            case DONE: {
                if (this.numActiveFlushes.get() != 0) {
                    this.progTracker.notDone();
                }
                this.snapshotContext.taskletDone(this.pendingSnapshotId - 1L, this.isHigherPrioritySource);
                return;
            }
        }
        throw new JetException("Unexpected state: " + (Object)((Object)this.state));
    }

    String currMapName() {
        return SnapshotRepository.snapshotDataMapName(this.jobId, this.pendingSnapshotId, this.vertexName);
    }

    public String toString() {
        return StoreSnapshotTasklet.class.getSimpleName() + '{' + this.vertexName + '}';
    }

    static enum State {
        DRAIN,
        FLUSH,
        REACHED_BARRIER,
        DONE;

    }
}

