/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.pulsar.jms;

import com.datastax.oss.pulsar.jms.PulsarConnection;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import com.datastax.oss.pulsar.jms.PulsarDestination;
import com.datastax.oss.pulsar.jms.PulsarMessage;
import com.datastax.oss.pulsar.jms.PulsarMessageConsumer;
import com.datastax.oss.pulsar.jms.PulsarMessageProducer;
import com.datastax.oss.pulsar.jms.PulsarQueue;
import com.datastax.oss.pulsar.jms.PulsarQueueBrowser;
import com.datastax.oss.pulsar.jms.PulsarTopic;
import com.datastax.oss.pulsar.jms.Utils;
import com.datastax.oss.pulsar.jms.messages.PulsarBytesMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarMapMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarObjectMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarSimpleMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarStreamMessage;
import com.datastax.oss.pulsar.jms.messages.PulsarTextMessage;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarSession
implements Session,
QueueSession,
TopicSession {
    private static final Logger log = LoggerFactory.getLogger(PulsarSession.class);
    private final PulsarConnection connection;
    private boolean jms20 = false;
    private final int sessionMode;
    private final boolean transacted;
    private boolean allowQueueOperations = true;
    private boolean allowTopicOperations = true;
    Transaction transaction;
    private MessageListener messageListener;
    private final Map<PulsarDestination, Producer<byte[]>> producers = new HashMap<PulsarDestination, Producer<byte[]>>();
    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final List<PulsarMessage> unackedMessages = new ArrayList<PulsarMessage>();
    private final Map<String, PulsarDestination> destinationBySubscription = new HashMap<String, PulsarDestination>();
    private volatile boolean closed;
    private volatile ListenerThread listenerThread;
    private final List<PulsarMessageConsumer> consumers = new CopyOnWriteArrayList<PulsarMessageConsumer>();

    public PulsarSession(int sessionMode, PulsarConnection connection) throws JMSException {
        this.connection = connection;
        this.sessionMode = sessionMode;
        this.transacted = sessionMode == 0;
        PulsarSession.validateSessionMode(sessionMode);
        if (sessionMode == 0 && !connection.getFactory().isEnableTransaction()) {
            throw new JMSException("Please enable transactions on PulsarConnectionFactory with enableTransaction=true");
        }
    }

    Transaction getTransaction() throws JMSException {
        if (this.transaction == null && this.sessionMode == 0) {
            this.transaction = this.startTransaction(this.connection);
        }
        return this.transaction;
    }

    private Transaction startTransaction(PulsarConnection connection) throws JMSException {
        Transaction transaction = null;
        int createTransactionTrials = 10;
        while (createTransactionTrials-- > 0) {
            try {
                try {
                    transaction = (Transaction)connection.getFactory().getPulsarClient().newTransaction().build().get();
                    break;
                }
                catch (ExecutionException err) {
                    if (err.getCause() instanceof TransactionCoordinatorClientException.CoordinatorNotFoundException) {
                        log.info("Transaction service not available {}", (Object)err.getCause().getMessage());
                        Thread.sleep(1000L);
                        continue;
                    }
                    throw Utils.handleException(err.getCause());
                }
            }
            catch (Exception err) {
                throw Utils.handleException(err);
            }
        }
        if (transaction == null) {
            throw new JMSException("Cannot create a Transaction in time");
        }
        return transaction;
    }

    private static void validateSessionMode(int sessionMode) throws JMSException {
        switch (sessionMode) {
            case 0: 
            case 1: 
            case 2: 
            case 3: {
                break;
            }
            default: {
                throw new JMSException("Invalid sessionMode " + sessionMode);
            }
        }
    }

    PulsarConnectionFactory getFactory() {
        return this.connection.getFactory();
    }

    public PulsarBytesMessage createBytesMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarBytesMessage();
    }

    public MapMessage createMapMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarMapMessage();
    }

    public MapMessage createMapMessage(Map<String, Object> body) throws JMSException {
        return new PulsarMapMessage(body);
    }

    public Message createMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarSimpleMessage();
    }

    public ObjectMessage createObjectMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarObjectMessage();
    }

    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
        this.checkNotClosed();
        PulsarObjectMessage res = new PulsarObjectMessage();
        res.setObject(object);
        return res;
    }

    public StreamMessage createStreamMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarStreamMessage();
    }

    public TextMessage createTextMessage() throws JMSException {
        this.checkNotClosed();
        return new PulsarTextMessage((String)null);
    }

    public TextMessage createTextMessage(String text) throws JMSException {
        this.checkNotClosed();
        return new PulsarTextMessage(text);
    }

    public boolean getTransacted() throws JMSException {
        this.checkNotClosed();
        return this.sessionMode == 0;
    }

    public int getAcknowledgeMode() throws JMSException {
        this.checkNotClosed();
        return this.sessionMode;
    }

    public void commit() throws JMSException {
        this.checkNotClosed();
        Utils.checkNotOnMessageListener(this);
        Utils.checkNotOnMessageProducer(this, null);
        if (!this.transacted) {
            throw new IllegalStateException("session is not transacted");
        }
        this.closeLock.readLock().lock();
        try {
            if (this.transaction != null) {
                Utils.get(this.transaction.commit());
                this.transaction = null;
            }
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    public void rollback() throws JMSException {
        this.checkNotClosed();
        Utils.checkNotOnMessageListener(this);
        Utils.checkNotOnMessageProducer(this, null);
        this.closeLock.readLock().lock();
        try {
            if (!this.transacted) {
                throw new IllegalStateException("session is not transacted");
            }
            if (this.transaction != null) {
                Utils.get(this.transaction.abort());
            }
            this.transaction = null;
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws JMSException {
        Utils.checkNotOnSessionCallback(this);
        this.closeLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.unackedMessages.clear();
            if (this.transacted && this.transaction != null) {
                Utils.get(this.transaction.abort());
                this.transaction = null;
            }
            for (PulsarMessageConsumer consumer : this.consumers) {
                consumer.closeInternal();
            }
        }
        finally {
            this.closeLock.writeLock().unlock();
            this.connection.unregisterSession(this);
        }
        if (this.listenerThread != null) {
            try {
                this.listenerThread.join();
            }
            catch (InterruptedException interruptedException) {
            }
            finally {
                this.listenerThread = null;
            }
        }
    }

    public void recover() throws JMSException {
        this.checkNotClosed();
        if (this.transacted) {
            throw new IllegalStateException("cannot call this method inside a transacted session");
        }
        log.info("recover, unacked messages {}", this.unackedMessages);
        for (PulsarMessage msg : this.unackedMessages) {
            log.info("recovering message {}", (Object)msg);
            msg.negativeAck();
        }
        this.unackedMessages.clear();
    }

    public MessageListener getMessageListener() throws JMSException {
        return this.messageListener;
    }

    public void setMessageListener(MessageListener listener) throws JMSException {
        Objects.requireNonNull(listener);
        this.messageListener = listener;
    }

    public void run() {
        if (this.consumers.isEmpty() || !this.connection.isStarted()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            return;
        }
        for (PulsarMessageConsumer consumer : this.consumers) {
            try {
                this.connection.executeInConnectionPausedLock(() -> {
                    consumer.runListener(100);
                    return null;
                }, 0);
            }
            catch (Throwable err) {
                log.error("Error in Session Thread {}", (Object)this, (Object)err);
            }
            if (this.connection.isStarted()) continue;
            return;
        }
    }

    public PulsarMessageProducer createProducer(Destination destination) throws JMSException {
        this.connection.setAllowSetClientId(false);
        return new PulsarMessageProducer(this, destination);
    }

    public PulsarMessageConsumer createConsumer(Destination destination) throws JMSException {
        return this.createConsumer(destination, null);
    }

    public PulsarMessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("null destination");
        }
        return this.createConsumer(destination, messageSelector, false);
    }

    public PulsarMessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
        if (destination == null) {
            throw new InvalidDestinationException("null destination");
        }
        return new PulsarMessageConsumer(UUID.randomUUID().toString(), (PulsarDestination)destination, this, SubscriptionMode.NonDurable, this.getFactory().getExclusiveSubscriptionTypeForSimpleConsumers(), messageSelector, false, noLocal).subscribe();
    }

    public PulsarMessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName) throws JMSException {
        return this.createSharedConsumer(topic, sharedSubscriptionName, null);
    }

    public PulsarMessageConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector) throws JMSException {
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        this.checkTopicOperationEnabled();
        sharedSubscriptionName = this.connection.prependClientId(sharedSubscriptionName, true);
        this.registerSubscriptionName(topic, sharedSubscriptionName, true);
        return new PulsarMessageConsumer(sharedSubscriptionName, (PulsarDestination)topic, this, SubscriptionMode.NonDurable, SubscriptionType.Shared, messageSelector, true, false).subscribe();
    }

    public PulsarQueue createQueue(String queueName) throws JMSException {
        this.checkNotClosed();
        this.checkQueueOperationEnabled();
        return new PulsarQueue(queueName);
    }

    public PulsarTopic createTopic(String topicName) throws JMSException {
        this.checkNotClosed();
        this.checkTopicOperationEnabled();
        return new PulsarTopic(topicName);
    }

    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
        return this.createDurableSubscriber(topic, name, null, false);
    }

    public PulsarMessageConsumer createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        return this.createDurableSubscriber(topic, name, messageSelector, noLocal, false);
    }

    private PulsarMessageConsumer createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal, boolean allowUnsetClientId) throws JMSException {
        this.checkTopicOperationEnabled();
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        name = this.connection.prependClientId(name, allowUnsetClientId);
        this.registerSubscriptionName(topic, name, false);
        return new PulsarMessageConsumer(name, (PulsarDestination)topic, this, SubscriptionMode.Durable, SubscriptionType.Exclusive, messageSelector, true, noLocal).subscribe();
    }

    private void registerSubscriptionName(Topic topic, String name, boolean shared) throws JMSException {
        PulsarDestination alreadyExists = this.destinationBySubscription.put(name, (PulsarDestination)topic);
        if (alreadyExists != null && alreadyExists.equals(topic) && !shared) {
            throw new IllegalStateException("a subscription with name " + name + " is already registered on this session");
        }
    }

    private void unregisterSubscriptionName(String name, Topic topic) {
        PulsarDestination existing = this.destinationBySubscription.get(name);
        if (existing != null && existing.equals(topic)) {
            this.destinationBySubscription.remove(name);
        }
    }

    public PulsarMessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
        return this.createDurableConsumer(topic, name, null, false);
    }

    public PulsarMessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
        return this.createDurableSubscriber(topic, name, messageSelector, noLocal, false);
    }

    public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
        return this.createSharedDurableConsumer(topic, name, null);
    }

    public PulsarMessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException {
        if (topic == null) {
            throw new InvalidDestinationException("null destination");
        }
        this.checkTopicOperationEnabled();
        name = this.connection.prependClientId(name, true);
        this.registerSubscriptionName(topic, name, true);
        return new PulsarMessageConsumer(name, (PulsarDestination)topic, this, SubscriptionMode.Durable, SubscriptionType.Shared, messageSelector, true, false).subscribe();
    }

    public QueueBrowser createBrowser(Queue queue) throws JMSException {
        return this.createBrowser(queue, null);
    }

    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
        if (queue == null) {
            throw new InvalidDestinationException("invalid null queue");
        }
        this.checkQueueOperationEnabled();
        return new PulsarQueueBrowser(this, queue, messageSelector);
    }

    public TemporaryQueue createTemporaryQueue() throws JMSException {
        this.checkNotClosed();
        this.checkQueueOperationEnabled();
        return this.connection.createTemporaryQueue(this);
    }

    public TemporaryTopic createTemporaryTopic() throws JMSException {
        this.checkNotClosed();
        this.checkTopicOperationEnabled();
        return this.connection.createTemporaryTopic(this);
    }

    public void unsubscribe(String name) throws JMSException {
        boolean someThingDone;
        this.checkNotClosed();
        name = this.connection.prependClientId(name, true);
        PulsarDestination destination = this.destinationBySubscription.remove(name);
        if (destination == null) {
            log.error("Cannot unsubscribe {}, please open and close the subscription within this session before unsubscribing, because in Pulsar you need to known the Destination for the subscription. Known subscription names {}", (Object)name, this.destinationBySubscription);
        }
        if (!(someThingDone = this.getFactory().deleteSubscription(destination, name))) {
            throw new InvalidDestinationException("Subscription " + name + " not found");
        }
    }

    void acknowledgeAllMessages() throws JMSException {
        this.checkNotClosed();
        for (PulsarMessage unackedMessage : new ArrayList<PulsarMessage>(this.unackedMessages)) {
            unackedMessage.acknowledgeInternal();
        }
        this.unackedMessages.clear();
    }

    public void registerUnacknowledgedMessage(PulsarMessage result) {
        this.unackedMessages.add(result);
    }

    public void unregisterUnacknowledgedMessage(PulsarMessage result) {
        this.unackedMessages.remove(result);
    }

    public void removeConsumer(PulsarMessageConsumer consumer) {
        Consumer<byte[]> pulsarConsumer = consumer.getInternalConsumer();
        if (pulsarConsumer != null) {
            this.consumers.remove(consumer);
            this.getFactory().removeConsumer(pulsarConsumer);
            Iterator<PulsarMessage> it = this.unackedMessages.iterator();
            while (it.hasNext()) {
                PulsarMessage message = it.next();
                if (!message.isReceivedFromConsumer(consumer)) continue;
                it.remove();
            }
        }
        if (consumer.unregisterSubscriptionOnClose) {
            this.unregisterSubscriptionName(consumer.subscriptionName, (Topic)consumer.getDestination());
        }
    }

    public void onError(Throwable err) {
        log.error("Internal error ", err);
    }

    public void registerConsumer(PulsarMessageConsumer consumer) {
        this.consumers.add(consumer);
        this.connection.setAllowSetClientId(false);
    }

    public boolean isJms20() {
        return this.jms20;
    }

    public void setJms20(boolean jms20) {
        this.jms20 = jms20;
    }

    public PulsarConnection getConnection() {
        return this.connection;
    }

    <T> T executeOperationIfConnectionStarted(BlockCLoseOperation<T> operation, int timeoutMillis) throws JMSException {
        this.checkNotClosed();
        return (T)this.connection.executeInConnectionPausedLock(operation::execute, timeoutMillis);
    }

    <T> T executeCriticalOperation(BlockCLoseOperation<T> operation) throws JMSException {
        this.checkNotClosed();
        this.closeLock.readLock().lock();
        try {
            this.checkNotClosed();
            T t = operation.execute();
            return t;
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    public QueueReceiver createReceiver(Queue queue) throws JMSException {
        return this.createConsumer((Destination)queue);
    }

    public QueueReceiver createReceiver(Queue queue, String s) throws JMSException {
        return this.createConsumer((Destination)queue, s);
    }

    public QueueSender createSender(Queue queue) throws JMSException {
        return this.createProducer((Destination)queue);
    }

    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
        return this.createConsumer((Destination)topic);
    }

    public TopicSubscriber createSubscriber(Topic topic, String s, boolean b) throws JMSException {
        return this.createConsumer((Destination)topic, s, b);
    }

    public TopicPublisher createPublisher(Topic topic) throws JMSException {
        return this.createProducer((Destination)topic);
    }

    public void checkNotClosed() throws JMSException {
        if (this.closed) {
            throw new IllegalStateException("Session is closed");
        }
        this.connection.checkNotClosed();
    }

    public boolean isClosed() {
        this.closeLock.readLock().lock();
        try {
            boolean bl = this.closed;
            return bl;
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    void ensureListenerThread() {
        if (this.listenerThread == null) {
            this.listenerThread = new ListenerThread();
            this.listenerThread.start();
        }
    }

    void checkQueueOperationEnabled() throws JMSException {
        if (!this.allowQueueOperations) {
            throw new IllegalStateException("This is not a QueueSession");
        }
    }

    void checkTopicOperationEnabled() throws JMSException {
        if (!this.allowTopicOperations) {
            throw new IllegalStateException("This is not a TopicSession");
        }
    }

    PulsarSession emulateLegacySession(boolean queue, boolean topic) {
        this.allowQueueOperations = queue;
        this.allowTopicOperations = topic;
        return this;
    }

    private class ListenerThread
    extends Thread {
        private ListenerThread() {
            super("jms-session-thread");
            this.setDaemon(true);
        }

        @Override
        public void run() {
            while (!PulsarSession.this.closed) {
                PulsarSession.this.run();
            }
        }
    }

    static interface BlockCLoseOperation<T> {
        public T execute() throws JMSException;
    }
}

