package org.apache.activemq.artemis.protocol.amqp.proton;

import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.engine.Delivery;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.class */
public class AMQPLargeMessageReader implements MessageReader {
    private final ProtonAbstractReceiver serverReceiver;
    private AMQPLargeMessage currentMessage;
    private DeliveryAnnotations deliveryAnnotations;
    private boolean closed = true;

    public AMQPLargeMessageReader(ProtonAbstractReceiver protonAbstractReceiver) {
        this.serverReceiver = protonAbstractReceiver;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageReader
    public DeliveryAnnotations getDeliveryAnnotations() {
        return this.deliveryAnnotations;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageReader
    public void close() {
        if (this.closed) {
            return;
        }
        if (this.currentMessage != null) {
            try {
                this.currentMessage.deleteFile();
            } catch (Throwable th) {
                ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(th);
            } finally {
                this.currentMessage = null;
            }
        }
        this.deliveryAnnotations = null;
        this.closed = true;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageReader
    public AMQPLargeMessageReader open() {
        if (!this.closed) {
            throw new IllegalStateException("Reader was not closed before call to open.");
        }
        this.closed = false;
        return this;
    }

    @Override // org.apache.activemq.artemis.protocol.amqp.proton.MessageReader
    public Message readBytes(Delivery delivery) throws Exception {
        AMQPLargeMessage aMQPLargeMessage;
        if (this.closed) {
            throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed");
        }
        ReadableBuffer recv = delivery.getLink().recv();
        if (this.currentMessage == null) {
            AMQPSessionCallback sessionSPI = this.serverReceiver.getSessionContext().getSessionSPI();
            long generateID = sessionSPI.getStorageManager().generateID();
            this.currentMessage = new AMQPLargeMessage(generateID, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager());
            this.currentMessage.parseHeader(recv);
            sessionSPI.getStorageManager().largeMessageCreated(generateID, this.currentMessage);
        }
        this.currentMessage.addBytes(recv);
        if (delivery.isPartial()) {
            aMQPLargeMessage = null;
        } else {
            this.currentMessage.releaseResources(this.serverReceiver.getConnection().isLargeMessageSync(), true);
            aMQPLargeMessage = this.currentMessage;
            this.currentMessage = null;
            this.deliveryAnnotations = aMQPLargeMessage.getDeliveryAnnotations();
        }
        return aMQPLargeMessage;
    }
}
