/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.amqp.server;

import com.caucho.amqp.common.AmqpReceiverLink;
import com.caucho.amqp.io.AmqpReader;
import com.caucho.amqp.io.FrameTransfer;
import com.caucho.amqp.io.MessageHeader;
import com.caucho.message.broker.BrokerSender;
import com.caucho.message.broker.SenderSettleHandler;
import com.caucho.util.CurrentTime;
import com.caucho.vfs.TempBuffer;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class AmqpServerReceiverLink
extends AmqpReceiverLink {
    private static final Logger log = Logger.getLogger(AmqpServerReceiverLink.class.getName());
    private final BrokerSender _sender;
    private final SenderSettleHandler _flowHandler;

    public AmqpServerReceiverLink(String name, String address, BrokerSender sender) {
        super(name, address);
        this._sender = sender;
        this._flowHandler = new FlowSettleHandler();
        this.setPrefetch(sender.getPrefetch());
    }

    @Override
    protected void onTransfer(FrameTransfer transfer, AmqpReader ain) throws IOException {
        super.onTransfer(transfer, ain);
        boolean isSettled = transfer.isSettled();
        long desc = ain.peekDescriptor();
        boolean isDurable = false;
        int priority = -1;
        long expireTime = 0L;
        if (desc == 112L) {
            MessageHeader header = new MessageHeader();
            header.read(ain);
            isDurable = header.isDurable();
            priority = header.getPriority();
            long ttl = header.getTimeToLive();
            if (ttl >= 0L) {
                expireTime = ttl + CurrentTime.getCurrentTime();
            }
        }
        int len = ain.getFrameAvailable();
        TempBuffer tBuf = TempBuffer.allocate();
        ain.read(tBuf.getBuffer(), 0, len);
        long xid = 0L;
        long mid = this._sender.nextMessageId();
        SenderSettleHandler handler = !isSettled ? new MessageSettleHandler(mid) : this._flowHandler;
        if (log.isLoggable(Level.FINER)) {
            log.finer(this + " onTransfer(" + mid + ",len=" + len + (isSettled ? ",settled" : "") + ")");
        }
        this._sender.message(xid, mid, isDurable, priority, expireTime, tBuf.getBuffer(), 0, len, tBuf, handler);
    }

    @Override
    protected int getPrefetchAvailable() {
        return this._sender.getPrefetch();
    }

    class MessageSettleHandler
    extends FlowSettleHandler {
        private final long _deliveryId;

        MessageSettleHandler(long deliveryId) {
            this._deliveryId = deliveryId;
        }

        @Override
        public boolean isSettled() {
            return false;
        }

        @Override
        public void onAccepted(long mid) {
            AmqpServerReceiverLink.this.getSession().accepted(this._deliveryId);
        }

        @Override
        public void onRejected(long mid, String msg) {
            AmqpServerReceiverLink.this.getSession().rejected(this._deliveryId, msg);
        }
    }

    class FlowSettleHandler
    implements SenderSettleHandler {
        FlowSettleHandler() {
        }

        @Override
        public boolean isSettled() {
            return true;
        }

        @Override
        public void onAccepted(long mid) {
            AmqpServerReceiverLink.this.updateTake();
        }

        @Override
        public void onRejected(long mid, String msg) {
            AmqpServerReceiverLink.this.updateTake();
        }
    }
}

