/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.BitSet;
import java.util.Map;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;

public class OutboxImpl
implements Outbox {
    private final OutboundCollector[] outstreams;
    private final ProgressTracker progTracker;
    private final SerializationService serializationService;
    private final int batchSize;
    private final int[] singleEdge = new int[]{0};
    private final int[] allEdges;
    private final int[] allEdgesAndSnapshot;
    private final int[] snapshotEdge;
    private final BitSet broadcastTracker;
    private Map.Entry<Data, Data> pendingSnapshotEntry;
    private int numRemainingInBatch;

    @SuppressFBWarnings(value={"EI_EXPOSE_REP"})
    public OutboxImpl(OutboundCollector[] outstreams, boolean hasSnapshot, ProgressTracker progTracker, SerializationService serializationService, int batchSize) {
        int[] nArray;
        this.outstreams = outstreams;
        this.progTracker = progTracker;
        this.serializationService = serializationService;
        this.batchSize = batchSize;
        Preconditions.checkPositive(batchSize, "batchSize must be positive");
        this.allEdges = IntStream.range(0, outstreams.length - (hasSnapshot ? 1 : 0)).toArray();
        this.allEdgesAndSnapshot = IntStream.range(0, outstreams.length).toArray();
        if (hasSnapshot) {
            int[] nArray2 = new int[1];
            nArray = nArray2;
            nArray2[0] = outstreams.length - 1;
        } else {
            nArray = null;
        }
        this.snapshotEdge = nArray;
        this.broadcastTracker = new BitSet(outstreams.length);
    }

    @Override
    public final int bucketCount() {
        return this.allEdges.length;
    }

    @Override
    public final boolean offer(int ordinal, @Nonnull Object item) {
        if (ordinal == -1) {
            return this.offer(this.allEdges, item);
        }
        if (ordinal == this.bucketCount()) {
            throw new IllegalArgumentException("Illegal edge ordinal: " + ordinal);
        }
        this.singleEdge[0] = ordinal;
        return this.offer(this.singleEdge, item);
    }

    @Override
    public final boolean offer(int[] ordinals, @Nonnull Object item) {
        if (this.numRemainingInBatch == 0) {
            return false;
        }
        assert (this.numRemainingInBatch > 0) : "numRemainingInBatch=" + this.numRemainingInBatch;
        --this.numRemainingInBatch;
        boolean done = true;
        for (int i = 0; i < ordinals.length; ++i) {
            if (this.broadcastTracker.get(i)) continue;
            ProgressState result = this.doOffer(this.outstreams[ordinals[i]], item);
            if (result.isMadeProgress()) {
                this.progTracker.madeProgress();
            }
            if (result.isDone()) {
                this.broadcastTracker.set(i);
                continue;
            }
            done = false;
        }
        if (done) {
            this.broadcastTracker.clear();
        }
        return done;
    }

    @Override
    public final boolean offer(@Nonnull Object item) {
        return this.offer(this.allEdges, item);
    }

    @Override
    public final boolean offerToSnapshot(@Nonnull Object key, @Nonnull Object value) {
        boolean success;
        if (this.snapshotEdge == null) {
            throw new IllegalStateException("Outbox does not have snapshot queue");
        }
        if (this.pendingSnapshotEntry == null) {
            Object sKey = this.serializationService.toData(key);
            Object sValue = this.serializationService.toData(value);
            this.pendingSnapshotEntry = Util.entry(sKey, sValue);
        }
        if (success = this.offer(this.snapshotEdge, this.pendingSnapshotEntry)) {
            this.pendingSnapshotEntry = null;
        }
        return success;
    }

    public void resetBatch() {
        this.numRemainingInBatch = this.batchSize;
    }

    private ProgressState doOffer(OutboundCollector collector, Object item) {
        if (item instanceof BroadcastItem) {
            return collector.offerBroadcast((BroadcastItem)item);
        }
        return collector.offer(item);
    }

    final boolean offerToEdgesAndSnapshot(Object item) {
        return this.offer(this.allEdgesAndSnapshot, item);
    }
}

