/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.util.concurrent.MPSCQueue;
import com.hazelcast.jet.impl.execution.BroadcastItem;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.OutboundCollector;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.ObjectWithPartitionId;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.nio.BufferObjectDataInput;
import com.hazelcast.util.concurrent.IdleStrategy;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;

public class ReceiverTasklet
implements Tasklet {
    public static final ILogger LOG = Logger.getLogger(ReceiverTasklet.class);
    static final int COMPRESSED_SEQ_UNIT_LOG2 = 16;
    static final int INITIAL_RECEIVE_WINDOW_COMPRESSED = 800;
    private final int rwinMultiplier;
    private final double flowControlPeriodNs;
    private final Queue<BufferObjectDataInput> incoming = new MPSCQueue((IdleStrategy)null);
    private final ProgressTracker tracker = new ProgressTracker();
    private final ArrayDeque<ObjWithPtionIdAndSize> inbox = new ArrayDeque();
    private final OutboundCollector collector;
    private boolean receptionDone;
    private final AtomicLong itemsInCounter = new AtomicLong();
    private final AtomicLong bytesInCounter = new AtomicLong();
    private volatile long ackedSeq;
    private volatile int numWaitingInInbox;
    private int receiveWindowCompressed;
    private int prevAckedSeqCompressed;
    private long prevTimestamp;

    public ReceiverTasklet(OutboundCollector collector, int rwinMultiplier, int flowControlPeriodMs) {
        this.collector = collector;
        this.rwinMultiplier = rwinMultiplier;
        this.flowControlPeriodNs = TimeUnit.MILLISECONDS.toNanos(flowControlPeriodMs);
        this.receiveWindowCompressed = 800;
    }

    @Override
    @Nonnull
    public ProgressState call() {
        ObjWithPtionIdAndSize o;
        if (this.receptionDone) {
            return this.collector.offerBroadcast(DoneItem.DONE_ITEM);
        }
        this.tracker.reset();
        this.tracker.notDone();
        this.tryFillInbox();
        while ((o = this.inbox.peek()) != null) {
            ProgressState outcome;
            Object item = o.getItem();
            if (item == DoneItem.DONE_ITEM) {
                this.receptionDone = true;
                this.inbox.remove();
                assert (this.inbox.peek() == null) : "Found something in the queue beyond the DONE_ITEM: " + this.inbox.remove();
                break;
            }
            ProgressState progressState = outcome = item instanceof BroadcastItem ? this.collector.offerBroadcast((BroadcastItem)item) : this.collector.offer(item, o.getPartitionId());
            if (!outcome.isDone()) {
                this.tracker.madeProgress(outcome.isMadeProgress());
                break;
            }
            this.tracker.madeProgress();
            this.inbox.remove();
            this.ackItem(o.estimatedMemoryFootprint);
        }
        this.numWaitingInInbox = this.inbox.size();
        return this.tracker.toProgressState();
    }

    void receiveStreamPacket(BufferObjectDataInput packetInput) {
        this.incoming.add(packetInput);
    }

    public int updateAndGetSendSeqLimitCompressed() {
        return this.updateAndGetSendSeqLimitCompressed(System.nanoTime());
    }

    int updateAndGetSendSeqLimitCompressed(long timestampNow) {
        boolean hadPrevStats = this.prevTimestamp != 0L || this.prevAckedSeqCompressed != 0;
        long ackTimeDelta = timestampNow - this.prevTimestamp;
        this.prevTimestamp = timestampNow;
        int ackedSeqCompressed = ReceiverTasklet.compressSeq(this.ackedSeq);
        int ackedSeqCompressedDelta = ackedSeqCompressed - this.prevAckedSeqCompressed;
        this.prevAckedSeqCompressed = ackedSeqCompressed;
        if (hadPrevStats) {
            double ackedSeqsPerAckPeriod = this.flowControlPeriodNs * (double)ackedSeqCompressedDelta / (double)ackTimeDelta;
            int targetRwin = this.rwinMultiplier * (int)Math.ceil(ackedSeqsPerAckPeriod);
            int rwinDiff = targetRwin - this.receiveWindowCompressed;
            int numWaitingInInbox = this.numWaitingInInbox;
            if (numWaitingInInbox == 0 && rwinDiff < 0) {
                rwinDiff = 0;
            }
            this.receiveWindowCompressed += rwinDiff / 2;
            LoggingUtil.logFinest(LOG, "receiveWindowCompressed=%d", this.receiveWindowCompressed);
        }
        return ackedSeqCompressed + this.receiveWindowCompressed;
    }

    long ackItem(long itemWeight) {
        return this.ackedSeq += itemWeight;
    }

    void setNumWaitingInInbox(int value) {
        this.numWaitingInInbox = value;
    }

    public String toString() {
        return "ReceiverTasklet";
    }

    static int compressSeq(long seq) {
        return (int)(seq >> 16);
    }

    static long estimatedMemoryFootprint(long itemBlobSize) {
        int inboxSlot = 4;
        int objPtionAndSenderIdHeader = 16;
        int itemField = 4;
        int itemObjHeader = 16;
        int partitionIdField = 4;
        int senderIdField = 4;
        int estimatedMemoryFootprintField = 8;
        int overhead = 56;
        return 56L + itemBlobSize;
    }

    private void tryFillInbox() {
        try {
            BufferObjectDataInput received;
            long totalBytes = 0L;
            long totalItems = 0L;
            while ((received = this.incoming.poll()) != null) {
                int itemCount = received.readInt();
                for (int i = 0; i < itemCount; ++i) {
                    int mark = received.position();
                    Object item = received.readObject();
                    int itemSize = received.position() - mark;
                    this.inbox.add(new ObjWithPtionIdAndSize(item, received.readInt(), itemSize));
                }
                totalItems += (long)itemCount;
                totalBytes += (long)received.position();
                received.close();
                this.tracker.madeProgress();
            }
            Util.lazyAdd(this.bytesInCounter, totalBytes);
            Util.lazyAdd(this.itemsInCounter, totalItems);
        }
        catch (IOException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    public AtomicLong getItemsInCounter() {
        return this.itemsInCounter;
    }

    public AtomicLong getBytesInCounter() {
        return this.bytesInCounter;
    }

    private static class ObjWithPtionIdAndSize
    extends ObjectWithPartitionId {
        final long estimatedMemoryFootprint;

        ObjWithPtionIdAndSize(Object item, int partitionId, int itemBlobSize) {
            super(item, partitionId);
            this.estimatedMemoryFootprint = ReceiverTasklet.estimatedMemoryFootprint(itemBlobSize);
        }
    }
}

