/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.PulsarDestination;
import com.datastax.oss.pulsar.jms.PulsarMessage;
import com.datastax.oss.pulsar.jms.PulsarSession;
import com.datastax.oss.pulsar.jms.PulsarTemporaryDestination;
import com.datastax.oss.pulsar.jms.Utils;
import com.datastax.oss.pulsar.jms.selectors.SelectorSupport;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSConsumer;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarMessageConsumer
implements MessageConsumer,
TopicSubscriber,
QueueReceiver {
    private static final Logger log = LoggerFactory.getLogger(PulsarMessageConsumer.class);
    final String subscriptionName;
    private final PulsarSession session;
    private final PulsarDestination destination;
    private final SelectorSupport selectorSupport;
    private final boolean noLocal;
    private Consumer<byte[]> consumer;
    private MessageListener listener;
    private final SubscriptionMode subscriptionMode;
    private final SubscriptionType subscriptionType;
    final boolean unregisterSubscriptionOnClose;
    private boolean closed;
    private boolean requestClose;

    public PulsarMessageConsumer(String subscriptionName, PulsarDestination destination, PulsarSession session, SubscriptionMode subscriptionMode, SubscriptionType subscriptionType, String selector, boolean unregisterSubscriptionOnClose, boolean noLocal) throws JMSException {
        PulsarTemporaryDestination dest;
        this.noLocal = noLocal;
        session.checkNotClosed();
        if (destination == null) {
            throw new InvalidDestinationException("Invalid destination");
        }
        if (destination instanceof PulsarTemporaryDestination && (dest = (PulsarTemporaryDestination)destination).getSession() != session) {
            throw new JMSException("Cannot subscribe to a temporary destination not created but this session");
        }
        this.subscriptionName = subscriptionName;
        this.session = session;
        this.destination = destination;
        this.subscriptionMode = destination.isQueue() ? SubscriptionMode.Durable : subscriptionMode;
        this.subscriptionType = destination.isQueue() ? SubscriptionType.Shared : subscriptionType;
        this.selectorSupport = SelectorSupport.build(selector, subscriptionType == SubscriptionType.Exclusive || session.getFactory().isEnableClientSideEmulation());
        this.unregisterSubscriptionOnClose = unregisterSubscriptionOnClose;
        if (noLocal && subscriptionType != SubscriptionType.Exclusive && !session.getFactory().isEnableClientSideEmulation()) {
            throw new IllegalStateException("noLocal is not enabled by default with subscriptionType " + subscriptionType + ", please set jms.enableClientSideEmulation=true");
        }
    }

    public PulsarMessageConsumer subscribe() throws JMSException {
        if (this.destination.isQueue()) {
            this.session.getFactory().ensureQueueSubscription(this.destination);
        } else {
            this.getConsumer();
        }
        this.session.registerConsumer(this);
        return this;
    }

    synchronized Consumer<byte[]> getConsumer() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Consumer is closed");
        }
        if (this.consumer == null) {
            this.consumer = this.session.getFactory().createConsumer(this.destination, this.subscriptionName, this.session.getAcknowledgeMode(), this.subscriptionMode, this.subscriptionType);
        }
        return this.consumer;
    }

    public synchronized String getMessageSelector() throws JMSException {
        this.checkNotClosed();
        return this.selectorSupport != null ? this.selectorSupport.getSelector() : null;
    }

    public synchronized MessageListener getMessageListener() throws JMSException {
        this.checkNotClosed();
        return this.listener;
    }

    synchronized void checkNotClosed() throws JMSException {
        this.session.checkNotClosed();
        if (this.closed || this.requestClose) {
            throw new IllegalStateException("This consumer is closed");
        }
    }

    public synchronized void setMessageListener(MessageListener listener) throws JMSException {
        this.checkNotClosed();
        this.listener = listener;
        this.session.ensureListenerThread();
    }

    public javax.jms.Message receive() throws JMSException {
        return this.receiveWithTimeoutAndValidateType(Long.MAX_VALUE, null);
    }

    public javax.jms.Message receive(long timeout) throws JMSException {
        return this.receiveWithTimeoutAndValidateType(timeout, null);
    }

    private synchronized javax.jms.Message receiveWithTimeoutAndValidateType(long timeout, Class expectedType) throws JMSException {
        this.checkNotClosed();
        if (this.listener != null) {
            throw new IllegalStateException("cannot receive if you have a messageListener");
        }
        int acquireConnectionStartTime = timeout == Long.MAX_VALUE ? Integer.MAX_VALUE : (int)timeout;
        int stepTimeout = timeout < 100L ? (int)timeout : 100;
        long start = System.currentTimeMillis();
        return this.session.executeOperationIfConnectionStarted(() -> {
            do {
                javax.jms.Message result;
                if ((result = (javax.jms.Message)this.session.executeCriticalOperation(() -> {
                    try {
                        Consumer<byte[]> consumer = this.getConsumer();
                        Message message = consumer.receive(stepTimeout, TimeUnit.MILLISECONDS);
                        if (message == null) {
                            return null;
                        }
                        return this.handleReceivedMessage((Message<byte[]>)message, expectedType, null, this.noLocal);
                    }
                    catch (Exception err) {
                        throw Utils.handleException(err);
                    }
                })) == null) continue;
                return result;
            } while (System.currentTimeMillis() - start < timeout && !this.session.isClosed());
            return null;
        }, acquireConnectionStartTime);
    }

    public javax.jms.Message receiveNoWait() throws JMSException {
        return this.receive(1L);
    }

    private void skipMessage(Message<byte[]> message) throws JMSException {
        if (this.subscriptionType == SubscriptionType.Exclusive || this.session.getFactory().isAcknowledgeRejectedMessages()) {
            if (this.session.getTransaction() != null) {
                this.consumer.acknowledgeAsync(message.getMessageId(), this.session.getTransaction());
            } else {
                this.consumer.acknowledgeAsync(message.getMessageId());
            }
        } else {
            log.info("nAck filtered msg {}", (Object)message.getMessageId());
            this.consumer.negativeAcknowledge(message);
        }
    }

    private PulsarMessage handleReceivedMessage(Message<byte[]> message, Class expectedType, java.util.function.Consumer<PulsarMessage> listenerCode, boolean noLocalFilter) throws JMSException, PulsarClientException {
        String senderConnectionID;
        PulsarMessage result = PulsarMessage.decode(this, message);
        Consumer<byte[]> consumer = this.getConsumer();
        if (expectedType != null && !result.isBodyAssignableTo(expectedType)) {
            log.info("negativeAcknowledge for message {} that cannot be converted to {}", message, (Object)expectedType);
            consumer.negativeAcknowledge(message);
            throw new MessageFormatException("The message (" + result.messageType() + "," + result + ",) cannot be converted to a " + expectedType);
        }
        if (this.selectorSupport != null && !this.selectorSupport.matches(result)) {
            log.info("msg {} does not match selector {}", (Object)result, (Object)this.selectorSupport.getSelector());
            this.skipMessage(message);
            return null;
        }
        if (noLocalFilter && (senderConnectionID = result.getStringProperty("JMSConnectionID")) != null && senderConnectionID.equals(this.session.getConnection().getConnectionId())) {
            log.info("msg {} was generated from this connection {}", (Object)result, (Object)senderConnectionID);
            this.skipMessage(message);
            return null;
        }
        if (result.getJMSExpiration() > 0L && System.currentTimeMillis() >= result.getJMSExpiration()) {
            log.info("msg {} expired at {}", (Object)result, (Object)Instant.ofEpochMilli(result.getJMSExpiration()));
            this.skipMessage(message);
            return null;
        }
        this.session.registerUnacknowledgedMessage(result);
        if (listenerCode != null) {
            try {
                listenerCode.accept(result);
            }
            catch (Throwable t) {
                log.error("Listener thrown error, calling negativeAcknowledge", t);
                consumer.negativeAcknowledge(message);
                throw Utils.handleException(t);
            }
            if (result.isNegativeAcked()) {
                return null;
            }
        }
        if (this.session.getTransacted()) {
            Utils.get(consumer.acknowledgeAsync(message.getMessageId(), this.session.getTransaction()));
        } else if (this.session.getAcknowledgeMode() == 1) {
            consumer.acknowledge(message);
        } else if (this.session.getAcknowledgeMode() == 3) {
            consumer.acknowledgeAsync(message).whenComplete((m, ex) -> {
                if (ex != null) {
                    log.error("Cannot acknowledge message {} {}", (Object)message, ex);
                }
            });
        }
        if (this.session.getAcknowledgeMode() != 2) {
            this.session.unregisterUnacknowledgedMessage(result);
        }
        if (this.requestClose) {
            this.closeInternal();
        }
        return result;
    }

    public synchronized void close() throws JMSException {
        if (Utils.isOnMessageListener(this.session, this)) {
            this.requestClose = true;
            return;
        }
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (this.consumer == null) {
            return;
        }
        this.session.executeCriticalOperation(() -> {
            try {
                this.consumer.close();
                this.session.removeConsumer(this);
                return null;
            }
            catch (Exception err) {
                throw Utils.handleException(err);
            }
        });
    }

    public String toString() {
        return "PulsarConsumer{subscriptionName=" + this.subscriptionName + ", destination=" + this.destination + '}';
    }

    public synchronized Topic getTopic() throws JMSException {
        this.checkNotClosed();
        if (this.destination.isTopic()) {
            return (Topic)this.destination;
        }
        throw new JMSException("This consumer has been created on a Queue");
    }

    public synchronized Queue getQueue() throws JMSException {
        this.checkNotClosed();
        if (this.destination.isQueue()) {
            return (Queue)this.destination;
        }
        throw new JMSException("This consumer has been created on a Topic");
    }

    public synchronized boolean getNoLocal() throws JMSException {
        this.checkNotClosed();
        return this.noLocal;
    }

    public JMSConsumer asJMSConsumer() {
        return new JMSConsumer(){

            public String getMessageSelector() {
                return Utils.runtimeException(() -> PulsarMessageConsumer.this.getMessageSelector());
            }

            public MessageListener getMessageListener() throws JMSRuntimeException {
                return Utils.runtimeException(() -> PulsarMessageConsumer.this.getMessageListener());
            }

            public void setMessageListener(MessageListener listener) throws JMSRuntimeException {
                Utils.runtimeException(() -> PulsarMessageConsumer.this.setMessageListener(listener));
            }

            public javax.jms.Message receive() {
                return Utils.runtimeException(() -> PulsarMessageConsumer.this.receive());
            }

            public javax.jms.Message receive(long timeout) {
                return Utils.runtimeException(() -> PulsarMessageConsumer.this.receive(timeout));
            }

            public javax.jms.Message receiveNoWait() {
                return Utils.runtimeException(() -> PulsarMessageConsumer.this.receiveNoWait());
            }

            public void close() {
                Utils.runtimeException(() -> PulsarMessageConsumer.this.close());
            }

            public <T> T receiveBody(Class<T> c) {
                return (T)Utils.runtimeException(() -> {
                    javax.jms.Message msg = PulsarMessageConsumer.this.receiveWithTimeoutAndValidateType(Long.MAX_VALUE, c);
                    return msg == null ? null : msg.getBody(c);
                });
            }

            public <T> T receiveBody(Class<T> c, long timeout) {
                return (T)Utils.runtimeException(() -> {
                    javax.jms.Message msg = PulsarMessageConsumer.this.receiveWithTimeoutAndValidateType(timeout, c);
                    return msg == null ? null : msg.getBody(c);
                });
            }

            public <T> T receiveBodyNoWait(Class<T> c) {
                return (T)Utils.runtimeException(() -> {
                    javax.jms.Message msg = PulsarMessageConsumer.this.receiveWithTimeoutAndValidateType(1L, c);
                    return msg == null ? null : msg.getBody(c);
                });
            }
        };
    }

    synchronized void acknowledge(Message<byte[]> receivedPulsarMessage, PulsarMessage message) throws JMSException {
        Consumer<byte[]> consumer = this.getConsumer();
        try {
            consumer.acknowledge(receivedPulsarMessage);
            this.session.unregisterUnacknowledgedMessage(message);
        }
        catch (PulsarClientException err) {
            throw Utils.handleException(err);
        }
    }

    synchronized void runListener(int timeout) {
        if (this.closed || this.listener == null) {
            return;
        }
        Utils.executeMessageListenerInSessionContext(this.session, this, () -> {
            if (this.closed) {
                return;
            }
            try {
                Consumer<byte[]> consumer = this.getConsumer();
                Message message = consumer.receive(timeout, TimeUnit.MILLISECONDS);
                if (message == null) {
                    return;
                }
                PulsarMessage pulsarMessage = this.handleReceivedMessage((Message<byte[]>)message, null, pmessage -> this.listener.onMessage((javax.jms.Message)pmessage), this.noLocal);
            }
            catch (PulsarClientException.AlreadyClosedException closed) {
                log.error("Error while receiving message con Closed consumer {}", (Object)this);
            }
            catch (JMSException | PulsarClientException err) {
                log.error("Error while receiving message con consumer {}", (Object)this, (Object)err);
                this.session.onError(err);
            }
        });
    }

    public void closeInternal() throws JMSException {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.requestClose = false;
        try {
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
        }
        catch (Exception err) {
            throw Utils.handleException(err);
        }
    }

    public void negativeAck(Message<byte[]> message) {
        if (this.consumer != null) {
            this.consumer.negativeAcknowledge(message);
        }
    }

    PulsarSession getSession() {
        return this.session;
    }

    Consumer<byte[]> getInternalConsumer() {
        return this.consumer;
    }

    Destination getDestination() {
        return this.destination;
    }

    public SubscriptionType getSubscriptionType() {
        return this.subscriptionType;
    }
}

