/*
 * Decompiled with CFR 0.152.
 */
package com.solacesystems.jms;

import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jms.ConfigurationException;
import com.solacesystems.jms.SolConnection;
import com.solacesystems.jms.SolConnectionConsumerIF;
import com.solacesystems.jms.SolConnectionIF;
import com.solacesystems.jms.SolDestinationInfo;
import com.solacesystems.jms.SolSession;
import com.solacesystems.jms.SolSessionIF;
import com.solacesystems.jms.encoding.JMSDecoder;
import com.solacesystems.jms.impl.AckHandler;
import com.solacesystems.jms.impl.ConnectionConsumerNoTransactionStrategy;
import com.solacesystems.jms.impl.ConnectionConsumerTransactionStrategy;
import com.solacesystems.jms.impl.ConnectionConsumerXATransactionStrategy;
import com.solacesystems.jms.impl.ConsumerFactory;
import com.solacesystems.jms.impl.JMSState;
import com.solacesystems.jms.impl.MessageAckHandlerImpl;
import com.solacesystems.jms.impl.SessionProperties;
import com.solacesystems.jms.impl.SessionTransactionType;
import com.solacesystems.jms.impl.Validator;
import com.solacesystems.jms.message.SolMessage;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class SolConnectionConsumerRA
implements SolConnectionConsumerIF {
    protected static final String Component = "ConnectionConsumer";
    static final Log log = LogFactory.getLog(SolConnectionConsumerRA.class);
    private final SolConnectionIF mConnection;
    private final SolDestinationInfo mDestInfo;
    private final ServerSessionPool mServerPool;
    private final int mMaxMessages;
    private final SessionProperties mSessionProps;
    private final AckHandler mAckHandler;
    private final ConnectionConsumerTransactionStrategy mAckTransaction;
    protected volatile JMSState mState;
    private volatile boolean sessionValidated = false;

    public SolConnectionConsumerRA(SolConnectionIF conn, SolDestinationInfo destInfo, String messageSelector, ServerSessionPool serverPool, int maxMessages, JMSState state) throws JMSException {
        Validator.checkServerSessionPool(serverPool);
        Validator.checkConnectionConsumerMaxMessages(maxMessages);
        this.mServerPool = serverPool;
        this.mConnection = conn;
        this.mDestInfo = destInfo;
        this.mState = state;
        boolean noLocal = false;
        this.mSessionProps = this.constructSessionProperties(conn, serverPool);
        boolean isDirect = this.mConnection.getProperties().getPropertyBean().getDirectTransport();
        Validator.checkTransactedAndAckMode(this.mSessionProps.getTransactionType(), this.mSessionProps.getAcknowledgeMode(), isDirect);
        Validator.checkTransactedAndLargeMessaging(this.mSessionProps.getTransactionType(), this.mConnection.getJCSMPProperties().getBooleanProperty("large_messaging"));
        ConsumerFactory cmrFactory = new ConsumerFactory(messageSelector, noLocal, this.mSessionProps, conn, null);
        this.mAckHandler = this.mSessionProps.getAckHandler();
        ((SolConnection)this.mConnection).addConnectionConsumer(this);
        ConnectionConsumerTransactionStrategy.InitProperties txStrategyProp = new ConnectionConsumerTransactionStrategy.InitProperties();
        txStrategyProp.withConnection(this.mConnection).withConsumerFactory(cmrFactory).withDestinationInfo(this.mDestInfo).withSessionProperties(this.mSessionProps);
        this.mAckTransaction = this.mSessionProps.getAcknowledgeMode() == 0 ? new ConnectionConsumerXATransactionStrategy(txStrategyProp) : new ConnectionConsumerNoTransactionStrategy(txStrategyProp);
        this.mMaxMessages = maxMessages;
        this.createConsumer();
        this.startWorkerThread();
        if (log.isDebugEnabled()) {
            log.debug((Object)("SolConnectionConsumerRA created. Connection: " + conn.toString() + "   Destination: " + destInfo.toString()));
        }
    }

    private SessionProperties constructSessionProperties(SolConnectionIF connection, ServerSessionPool serverPool) throws JMSException {
        boolean xa = connection instanceof XAConnection;
        int acknowledgeMode = xa ? 0 : 1;
        log.debug((Object)String.format("Sessions in ServerSessionPool should be: XA:%s ackMode:%s", xa, acknowledgeMode));
        MessageAckHandlerImpl ackHandler = new MessageAckHandlerImpl(acknowledgeMode);
        return new SessionProperties(connection.getProperties(), xa ? SessionTransactionType.XATransaction : SessionTransactionType.NoTransaction, acknowledgeMode, ackHandler);
    }

    public ServerSessionPool getServerSessionPool() throws JMSException {
        return this.mServerPool;
    }

    @Override
    public void start() throws JMSException {
        Validator.checkClosed(this.mState, Component);
        if (this.mState == JMSState.Stopped) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Starting Connection Consumer.");
            }
            this.mState = JMSState.Started;
            this.startConsumer();
            if (log.isDebugEnabled()) {
                log.debug((Object)"Connection consumer started.");
            }
        }
    }

    @Override
    public void stop() throws JMSException {
        Validator.checkClosed(this.mState, Component);
        if (this.mState == JMSState.Started) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Stopping Connection Consumer.");
            }
            this.mState = JMSState.Stopped;
            this.stopConsumer();
            if (log.isDebugEnabled()) {
                log.debug((Object)"Connection Consumer stopped.");
            }
        }
    }

    private void startConsumer() throws JMSException {
        try {
            this.mAckTransaction.start();
        }
        catch (JMSException e) {
            this.deliverException(e);
            throw e;
        }
    }

    private void stopConsumer() {
        this.mAckTransaction.stop();
    }

    protected void createConsumer() throws JMSException {
        this.mAckTransaction.createConsumer();
        if (this.mState == JMSState.Started) {
            this.startConsumer();
        }
    }

    @Override
    public void close() throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Connection Consumer closed.");
        }
        this.mState = JMSState.Closed;
        this.mAckTransaction.closeConsumer();
        this.mAckTransaction.close();
        ((SolConnection)this.mConnection).removeConnectionConsumer(this);
    }

    public String toString() {
        String fmt = String.format("SolConnectionConsumerRA(Destination:%s SubscriptionName:%s SessionProperties:%s TransactionState:%s)", new Object[]{this.mDestInfo.destination, this.mDestInfo.subscriptionName, this.mSessionProps, this.mAckTransaction.getTxState()});
        return fmt;
    }

    protected SolMessage createMessage(BytesXMLMessage xmlMessage) throws JMSException {
        if (xmlMessage != null) {
            SolMessage jmsMessage = JMSDecoder.createJMSMessage(xmlMessage);
            jmsMessage.setAckHandler(this.mAckHandler);
            this.mAckHandler.onMessageCreate(jmsMessage);
            return jmsMessage;
        }
        return null;
    }

    private void startWorkerThread() {
        Thread worker = new Thread(new Runnable(){

            public void run() {
                SolConnectionConsumerRA.this.mainLoop();
            }
        });
        String randStr = UUID.randomUUID().toString();
        randStr = randStr.substring(randStr.length() - 8);
        worker.setName("SolConnectionConsumerRAWorker-" + randStr);
        worker.setDaemon(true);
        worker.start();
    }

    void mainLoop() {
        ArrayList<Message> msgBatch = new ArrayList<Message>(this.mMaxMessages);
        try {
            while (this.mState != JMSState.Closed) {
                block13: {
                    if (this.mAckTransaction.getTxState() == ConnectionConsumerTransactionStrategy.TransactionState.ACTIVE || this.mAckTransaction.getTxState() == ConnectionConsumerTransactionStrategy.TransactionState.NONTRANSACTED) {
                        try {
                            msgBatch.clear();
                            while (msgBatch.size() < this.mMaxMessages) {
                                int pre_sz = msgBatch.size();
                                this.pollMessage(msgBatch);
                                if (msgBatch.size() != pre_sz) continue;
                                break;
                            }
                            if (msgBatch.size() > 0) {
                                this.deliver(msgBatch);
                                this.mAckTransaction.afterDelivery();
                                msgBatch.clear();
                            }
                        }
                        catch (JCSMPException ex) {
                            if (this.mState == JMSState.Closed) break block13;
                            throw Validator.createJMSException("soljms.operation.recv", (Throwable)((Object)ex));
                        }
                    }
                }
                this.mAckTransaction.afterPollLoop();
            }
        }
        catch (JMSException ex) {
            if (this.mState != JMSState.Closed) {
                log.error((Object)("Exception occurred in receive loop, stop connection consumer - " + ex.getMessage()), (Throwable)ex);
                try {
                    this.stop();
                }
                catch (JMSException e) {
                    // empty catch block
                }
                this.deliverException(ex);
            }
            log.debug((Object)("Got JMSException: \"" + ex.getMessage() + "\" in main loop but JMSState was already closed - ignoring"));
        }
        catch (Exception ex) {
            JMSException jmsEx = Validator.createJMSException(null, ex);
            this.deliverException(jmsEx);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)"SolConnectionConsumerRA.mainLoop() exit.");
        }
    }

    private void pollMessage(List<Message> batch) throws JCSMPException, JMSException {
        BytesXMLMessage msg = null;
        msg = batch.size() == 0 ? this.mAckTransaction.getConsumer().receive(20) : this.mAckTransaction.getConsumer().receiveNoWait();
        if (msg != null) {
            batch.add(this.createMessage(msg));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deliver(Collection<Message> msgs) throws JMSException {
        ArrayList<Message> deliveryBatch = new ArrayList<Message>(msgs);
        this.mAckTransaction.onMessage(deliveryBatch);
        ServerSession serverSession = this.mServerPool.getServerSession();
        if (this.mState == JMSState.Closed) {
            log.debug((Object)"ConnectionConsumer got a server session in closed state");
            serverSession.start();
            throw new JMSException("Aborting deliver because ConnectionConsumer state has changed to closed.");
        }
        Session deliverySession = serverSession.getSession();
        try {
            if (!this.sessionValidated) {
                if (!(deliverySession instanceof SolSessionIF)) {
                    throw new ConfigurationException("Sessions in ServerSessionPool must be Solace JMS Session");
                }
                boolean transacted = deliverySession.getTransacted();
                int acknowledgeMode = deliverySession.getAcknowledgeMode();
                log.debug((Object)String.format("deliverySession: transacted:%s ackMode:%s sniffSession:%s", transacted, acknowledgeMode, deliverySession));
                if (!(this.mConnection instanceof XAConnection) && transacted) {
                    throw new ConfigurationException("Sessions in ServerSessionPool cannot be transacted Session or XASession if ConnectionConsumer is created from Connection");
                }
                if (this.mConnection instanceof XAConnection && !(deliverySession instanceof XASession)) {
                    throw new ConfigurationException("Sessions in ServerSessionPool must be XASession if ConnectionConsumer is created from XAConnection");
                }
                SolSession solSession = (SolSession)deliverySession;
                if (solSession.getConnection() != this.mConnection) {
                    throw new ConfigurationException("Sessions in ServerSessionPool must be created from the same Connection instance as ConnectionConsumer");
                }
                if (!transacted && acknowledgeMode == 2) {
                    throw new ConfigurationException("Sessions in ServerSessionPool cannot have client acknowledge mode");
                }
                this.sessionValidated = true;
            }
            ((SolSession)deliverySession).loadMessages(deliveryBatch, this);
        }
        finally {
            serverSession.start();
        }
    }

    private void deliverException(JMSException ex) {
        ExceptionListener listener = this.mConnection.getProperties().getExceptionListener();
        log.debug((Object)String.format("%s Delivering exception to connection exception listener (%s): %s", this.toString(), listener, ex.toString()));
        if (listener != null) {
            listener.onException(ex);
        }
    }

    @Override
    public void commitBatch(Collection<Message> msgs) throws JMSException {
        throw new ConfigurationException("commitBatch call is unsupported by this implementation of ConnectionConsumer");
    }

    @Override
    public void rollbackBatch(Collection<Message> msgs) throws JMSException {
        throw new ConfigurationException("rollbackBatch call is unsupported by this implementation of ConnectionConsumer");
    }

    public AckHandler getAckHandler() {
        return this.mAckHandler;
    }

    public ConnectionConsumerTransactionStrategy getAckTransaction() {
        return this.mAckTransaction;
    }
}

