/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.networking.OutboundFrame;
import com.hazelcast.jet.impl.Networking;
import com.hazelcast.jet.impl.execution.DoneItem;
import com.hazelcast.jet.impl.execution.InboundEdgeStream;
import com.hazelcast.jet.impl.execution.ReceiverTasklet;
import com.hazelcast.jet.impl.execution.Tasklet;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ObjectWithPartitionId;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.BufferObjectDataOutput;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.Packet;
import com.hazelcast.spi.NodeEngine;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import javax.annotation.Nonnull;

public class SenderTasklet
implements Tasklet {
    private final Connection connection;
    private final Queue<Object> inbox = new ArrayDeque<Object>();
    private final ProgressTracker progTracker = new ProgressTracker();
    private final InboundEdgeStream inboundEdgeStream;
    private final BufferObjectDataOutput outputBuffer;
    private final int bufPosPastHeader;
    private final int packetSizeLimit;
    private boolean instreamExhausted;
    private long sentSeq;
    private volatile int sendSeqLimitCompressed;

    public SenderTasklet(InboundEdgeStream inboundEdgeStream, NodeEngine nodeEngine, Address destinationAddress, long executionId, int destinationVertexId, int packetSizeLimit) {
        this.inboundEdgeStream = inboundEdgeStream;
        this.packetSizeLimit = packetSizeLimit;
        this.connection = Util.getMemberConnection(nodeEngine, destinationAddress);
        this.outputBuffer = Util.createObjectDataOutput(nodeEngine);
        Util.uncheckRun(() -> this.outputBuffer.write(Networking.createStreamPacketHeader(nodeEngine, executionId, destinationVertexId, inboundEdgeStream.ordinal())));
        this.bufPosPastHeader = this.outputBuffer.position();
    }

    @Override
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        this.tryFillInbox();
        if (this.progTracker.isDone()) {
            return this.progTracker.toProgressState();
        }
        if (this.tryFillOutputBuffer()) {
            this.progTracker.madeProgress();
            this.connection.write((OutboundFrame)new Packet(this.outputBuffer.toByteArray()).setPacketType(Packet.Type.JET));
        }
        return this.progTracker.toProgressState();
    }

    private void tryFillInbox() {
        if (!this.inbox.isEmpty()) {
            this.progTracker.notDone();
            return;
        }
        if (this.instreamExhausted) {
            return;
        }
        this.progTracker.notDone();
        ProgressState result = this.inboundEdgeStream.drainTo(this.inbox::add);
        this.progTracker.madeProgress(result.isMadeProgress());
        this.instreamExhausted = result.isDone();
        if (this.instreamExhausted) {
            this.inbox.add(new ObjectWithPartitionId(DoneItem.DONE_ITEM, -1));
        }
    }

    private boolean tryFillOutputBuffer() {
        try {
            Object item;
            this.outputBuffer.position(this.bufPosPastHeader + 4);
            int writtenCount = 0;
            while (this.outputBuffer.position() < this.packetSizeLimit && SenderTasklet.isWithinLimit(this.sentSeq, this.sendSeqLimitCompressed) && (item = this.inbox.poll()) != null) {
                ObjectWithPartitionId itemWithpId = item instanceof ObjectWithPartitionId ? (ObjectWithPartitionId)item : new ObjectWithPartitionId(item, -1);
                int mark = this.outputBuffer.position();
                this.outputBuffer.writeObject(itemWithpId.getItem());
                this.sentSeq += ReceiverTasklet.estimatedMemoryFootprint(this.outputBuffer.position() - mark);
                this.outputBuffer.writeInt(itemWithpId.getPartitionId());
                ++writtenCount;
            }
            this.outputBuffer.writeInt(this.bufPosPastHeader, writtenCount);
            return writtenCount > 0;
        }
        catch (IOException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    public void setSendSeqLimitCompressed(int sendSeqLimitCompressed) {
        this.sendSeqLimitCompressed = sendSeqLimitCompressed;
    }

    public String toString() {
        return "SenderTasklet " + this.connection.getEndPoint();
    }

    static boolean isWithinLimit(long sentSeq, int sendSeqLimitCompressed) {
        return ReceiverTasklet.compressSeq(sentSeq) - sendSeqLimitCompressed <= 0;
    }
}

