/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.test.JmsResourceProvider;
import org.apache.activemq.test.TestSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class JmsTransactionTestSupport
extends TestSupport
implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class);
    private static final int MESSAGE_COUNT = 5;
    private static final String MESSAGE_TEXT = "message";
    protected Connection connection;
    protected Session session;
    protected MessageConsumer consumer;
    protected MessageProducer producer;
    protected JmsResourceProvider resourceProvider;
    protected Destination destination;
    protected int batchCount = 10;
    protected int batchSize = 20;
    protected BrokerService broker;
    private final List<Message> unackMessages = new ArrayList<Message>(5);
    private final List<Message> ackMessages = new ArrayList<Message>(5);
    private boolean resendPhase;

    public JmsTransactionTestSupport() {
    }

    public JmsTransactionTestSupport(String name) {
        super(name);
    }

    protected void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        this.broker.waitUntilStarted();
        this.resourceProvider = this.getJmsResourceProvider();
        this.topic = this.resourceProvider.isTopic();
        this.setSessionTransacted();
        this.connectionFactory = this.newConnectionFactory();
        this.reconnect();
    }

    protected void setSessionTransacted() {
        this.resourceProvider.setTransacted(true);
    }

    protected ConnectionFactory newConnectionFactory() throws Exception {
        return this.resourceProvider.createConnectionFactory();
    }

    protected void beginTx() throws Exception {
    }

    protected void commitTx() throws Exception {
        this.session.commit();
    }

    protected void rollbackTx() throws Exception {
        this.session.rollback();
    }

    protected BrokerService createBroker() throws Exception, URISyntaxException {
        System.setProperty("org.apache.activemq.SERIALIZABLE_PACKAGES", "java.util");
        return BrokerFactory.createBroker((URI)new URI("broker://()/localhost?persistent=false"));
    }

    protected void tearDown() throws Exception {
        LOG.info("Closing down connection");
        try {
            this.session.close();
            this.session = null;
        }
        catch (Exception e) {
            LOG.info("Caught exception while closing resources.");
        }
        try {
            this.connection.close();
            this.connection = null;
        }
        catch (Exception e) {
            LOG.info("Caught exception while closing resources.");
        }
        try {
            this.broker.stop();
            this.broker.waitUntilStopped();
            this.broker = null;
        }
        catch (Exception e) {
            LOG.info("Caught exception while shutting down the Broker", (Throwable)e);
        }
        LOG.info("Connection closed.");
    }

    protected abstract JmsResourceProvider getJmsResourceProvider();

    public void testSendReceiveTransactedBatches() throws Exception {
        TextMessage message = this.session.createTextMessage("Batch Message");
        for (int j = 0; j < this.batchCount; ++j) {
            int i;
            LOG.info("Producing batch " + j + " of " + this.batchSize + " messages");
            this.beginTx();
            for (i = 0; i < this.batchSize; ++i) {
                this.producer.send((Message)message);
            }
            this.messageSent();
            this.commitTx();
            LOG.info("Consuming batch " + j + " of " + this.batchSize + " messages");
            this.beginTx();
            for (i = 0; i < this.batchSize; ++i) {
                message = (TextMessage)this.consumer.receive(5000L);
                JmsTransactionTestSupport.assertNotNull((String)("Received only " + i + " messages in batch " + j), (Object)message);
                JmsTransactionTestSupport.assertEquals((String)"Batch Message", (String)message.getText());
            }
            this.commitTx();
        }
    }

    protected void messageSent() throws Exception {
    }

    public void testSendRollback() throws Exception {
        Message[] outbound = new Message[]{this.session.createTextMessage("First Message"), this.session.createTextMessage("Second Message")};
        this.beginTx();
        this.producer.send(outbound[0]);
        this.commitTx();
        this.beginTx();
        this.producer.send((Message)this.session.createTextMessage("I'm going to get rolled back."));
        this.rollbackTx();
        this.beginTx();
        this.producer.send(outbound[1]);
        this.commitTx();
        this.beginTx();
        ArrayList<Message> messages = new ArrayList<Message>();
        LOG.info("About to consume message 1");
        Message message = this.consumer.receive(1000L);
        messages.add(message);
        LOG.info("Received: " + message);
        LOG.info("About to consume message 2");
        message = this.consumer.receive(4000L);
        messages.add(message);
        LOG.info("Received: " + message);
        this.commitTx();
        Message[] inbound = new Message[messages.size()];
        messages.toArray(inbound);
        this.assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
    }

    public void testAckMessageInTx() throws Exception {
        Message[] outbound = new Message[]{this.session.createTextMessage("First Message")};
        this.beginTx();
        this.producer.send(outbound[0]);
        outbound[0].acknowledge();
        this.commitTx();
        outbound[0].acknowledge();
        this.beginTx();
        ArrayList<Message> messages = new ArrayList<Message>();
        LOG.info("About to consume message 1");
        Message message = this.consumer.receive(1000L);
        messages.add(message);
        LOG.info("Received: " + message);
        this.commitTx();
        Message[] inbound = new Message[messages.size()];
        messages.toArray(inbound);
        this.assertTextMessagesEqual("Message not delivered.", outbound, inbound);
    }

    public void testSendSessionClose() throws Exception {
        Message[] outbound = new Message[]{this.session.createTextMessage("First Message"), this.session.createTextMessage("Second Message")};
        this.beginTx();
        this.producer.send(outbound[0]);
        this.commitTx();
        this.beginTx();
        this.producer.send((Message)this.session.createTextMessage("I'm going to get rolled back."));
        this.consumer.close();
        this.reconnectSession();
        this.producer.send(outbound[1]);
        this.commitTx();
        ArrayList<Message> messages = new ArrayList<Message>();
        LOG.info("About to consume message 1");
        this.beginTx();
        Message message = this.consumer.receive(1000L);
        messages.add(message);
        LOG.info("Received: " + message);
        LOG.info("About to consume message 2");
        message = this.consumer.receive(4000L);
        messages.add(message);
        LOG.info("Received: " + message);
        this.commitTx();
        Message[] inbound = new Message[messages.size()];
        messages.toArray(inbound);
        this.assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
    }

    public void testSendSessionAndConnectionClose() throws Exception {
        Message[] outbound = new Message[]{this.session.createTextMessage("First Message"), this.session.createTextMessage("Second Message")};
        this.beginTx();
        this.producer.send(outbound[0]);
        this.commitTx();
        this.beginTx();
        this.producer.send((Message)this.session.createTextMessage("I'm going to get rolled back."));
        this.consumer.close();
        this.session.close();
        this.reconnect();
        this.beginTx();
        this.producer.send(outbound[1]);
        this.commitTx();
        ArrayList<Message> messages = new ArrayList<Message>();
        LOG.info("About to consume message 1");
        this.beginTx();
        Message message = this.consumer.receive(1000L);
        messages.add(message);
        LOG.info("Received: " + message);
        LOG.info("About to consume message 2");
        message = this.consumer.receive(4000L);
        messages.add(message);
        LOG.info("Received: " + message);
        this.commitTx();
        Message[] inbound = new Message[messages.size()];
        messages.toArray(inbound);
        this.assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
    }

    public void testReceiveRollback() throws Exception {
        Message[] outbound = new Message[]{this.session.createTextMessage("First Message"), this.session.createTextMessage("Second Message")};
        this.beginTx();
        while (this.consumer.receive(1000L) != null) {
        }
        this.commitTx();
        this.beginTx();
        this.producer.send(outbound[0]);
        this.producer.send(outbound[1]);
        this.commitTx();
        LOG.info("Sent 0: " + outbound[0]);
        LOG.info("Sent 1: " + outbound[1]);
        ArrayList<Message> messages = new ArrayList<Message>();
        this.beginTx();
        Message message = this.consumer.receive(1000L);
        messages.add(message);
        this.assertEquals(outbound[0], message);
        this.commitTx();
        this.beginTx();
        message = this.consumer.receive(1000L);
        JmsTransactionTestSupport.assertNotNull((Object)message);
        this.assertEquals(outbound[1], message);
        this.rollbackTx();
        this.beginTx();
        message = this.consumer.receive(5000L);
        JmsTransactionTestSupport.assertNotNull((String)"Should have re-received the message again!", (Object)message);
        messages.add(message);
        this.commitTx();
        Message[] inbound = new Message[messages.size()];
        messages.toArray(inbound);
        this.assertTextMessagesEqual("Rollback did not work", outbound, inbound);
    }

    public void testReceiveTwoThenRollback() throws Exception {
        Message[] outbound = new Message[]{this.session.createTextMessage("First Message"), this.session.createTextMessage("Second Message")};
        this.beginTx();
        while (this.consumer.receive(1000L) != null) {
        }
        this.commitTx();
        this.beginTx();
        this.producer.send(outbound[0]);
        this.producer.send(outbound[1]);
        this.commitTx();
        LOG.info("Sent 0: " + outbound[0]);
        LOG.info("Sent 1: " + outbound[1]);
        ArrayList<Message> messages = new ArrayList<Message>();
        this.beginTx();
        Message message = this.consumer.receive(1000L);
        this.assertEquals(outbound[0], message);
        message = this.consumer.receive(1000L);
        JmsTransactionTestSupport.assertNotNull((Object)message);
        this.assertEquals(outbound[1], message);
        this.rollbackTx();
        this.beginTx();
        message = this.consumer.receive(5000L);
        JmsTransactionTestSupport.assertNotNull((String)"Should have re-received the first message again!", (Object)message);
        messages.add(message);
        this.assertEquals(outbound[0], message);
        message = this.consumer.receive(5000L);
        JmsTransactionTestSupport.assertNotNull((String)"Should have re-received the second message again!", (Object)message);
        messages.add(message);
        this.assertEquals(outbound[1], message);
        JmsTransactionTestSupport.assertNull((Object)this.consumer.receiveNoWait());
        this.commitTx();
        Message[] inbound = new Message[messages.size()];
        messages.toArray(inbound);
        this.assertTextMessagesEqual("Rollback did not work", outbound, inbound);
    }

    public void testSendReceiveWithPrefetchOne() throws Exception {
        int i;
        this.setPrefetchToOne();
        Message[] outbound = new Message[]{this.session.createTextMessage("First Message"), this.session.createTextMessage("Second Message"), this.session.createTextMessage("Third Message"), this.session.createTextMessage("Fourth Message")};
        this.beginTx();
        for (i = 0; i < outbound.length; ++i) {
            this.producer.send(outbound[i]);
        }
        this.commitTx();
        this.beginTx();
        for (i = 0; i < outbound.length; ++i) {
            LOG.info("About to consume message 1");
            Message message = this.consumer.receive(1000L);
            JmsTransactionTestSupport.assertNotNull((Object)message);
            LOG.info("Received: " + message);
        }
        this.commitTx();
    }

    public void testReceiveTwoThenRollbackManyTimes() throws Exception {
        for (int i = 0; i < 5; ++i) {
            this.testReceiveTwoThenRollback();
        }
    }

    public void testSendRollbackWithPrefetchOfOne() throws Exception {
        this.setPrefetchToOne();
        this.testSendRollback();
    }

    public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
        this.setPrefetchToOne();
        this.testReceiveRollback();
    }

    public void testCloseConsumerBeforeCommit() throws Exception {
        TextMessage[] outbound = new TextMessage[]{this.session.createTextMessage("First Message"), this.session.createTextMessage("Second Message")};
        this.beginTx();
        while (this.consumer.receiveNoWait() != null) {
        }
        this.commitTx();
        this.beginTx();
        this.producer.send((Message)outbound[0]);
        this.producer.send((Message)outbound[1]);
        this.commitTx();
        LOG.info("Sent 0: " + outbound[0]);
        LOG.info("Sent 1: " + outbound[1]);
        this.beginTx();
        TextMessage message = (TextMessage)this.consumer.receive(1000L);
        JmsTransactionTestSupport.assertEquals((String)outbound[0].getText(), (String)message.getText());
        this.consumer.close();
        this.commitTx();
        this.consumer = this.resourceProvider.createConsumer(this.session, this.destination);
        LOG.info("Created consumer: " + this.consumer);
        this.beginTx();
        message = (TextMessage)this.consumer.receive(1000L);
        JmsTransactionTestSupport.assertEquals((String)outbound[1].getText(), (String)message.getText());
        this.commitTx();
    }

    public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
        ArrayList<String> list = new ArrayList<String>();
        list.add("First");
        ObjectMessage outbound = this.session.createObjectMessage(list);
        outbound.setStringProperty("foo", "abc");
        this.beginTx();
        this.producer.send((Message)outbound);
        this.commitTx();
        LOG.info("About to consume message 1");
        this.beginTx();
        Message message = this.consumer.receive(5000L);
        List<String> body = this.assertReceivedObjectMessageWithListBody(message);
        try {
            message.setStringProperty("foo", "def");
            JmsTransactionTestSupport.fail((String)"Cannot change properties of the object!");
        }
        catch (JMSException e) {
            LOG.info("Caught expected exception: " + e, (Throwable)e);
        }
        body.clear();
        body.add("This should never be seen!");
        this.rollbackTx();
        this.beginTx();
        message = this.consumer.receive(5000L);
        List<String> secondBody = this.assertReceivedObjectMessageWithListBody(message);
        JmsTransactionTestSupport.assertNotSame((String)"Second call should return a different body", secondBody, body);
        this.commitTx();
    }

    protected List<String> assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
        JmsTransactionTestSupport.assertNotNull((String)"Should have received a message!", (Object)message);
        JmsTransactionTestSupport.assertEquals((String)"foo header", (String)"abc", (String)message.getStringProperty("foo"));
        JmsTransactionTestSupport.assertTrue((String)("Should be an object message but was: " + message), (boolean)(message instanceof ObjectMessage));
        ObjectMessage objectMessage = (ObjectMessage)message;
        List body = (List)((Object)objectMessage.getObject());
        LOG.info("Received body: " + body);
        JmsTransactionTestSupport.assertEquals((String)"Size of list should be 1", (int)1, (int)body.size());
        JmsTransactionTestSupport.assertEquals((String)"element 0 of list", (String)"First", (String)((String)body.get(0)));
        return body;
    }

    protected void reconnect() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
        this.session = null;
        this.connection = this.resourceProvider.createConnection(this.connectionFactory);
        this.reconnectSession();
        this.connection.start();
    }

    protected void reconnectSession() throws JMSException {
        if (this.session != null) {
            this.session.close();
        }
        this.session = this.resourceProvider.createSession(this.connection);
        this.destination = this.resourceProvider.createDestination(this.session, this.getSubject());
        this.producer = this.resourceProvider.createProducer(this.session, this.destination);
        this.consumer = this.resourceProvider.createConsumer(this.session, this.destination);
    }

    protected void setPrefetchToOne() {
        ActiveMQPrefetchPolicy prefetchPolicy = this.getPrefetchPolicy();
        prefetchPolicy.setQueuePrefetch(1);
        prefetchPolicy.setTopicPrefetch(1);
        prefetchPolicy.setDurableTopicPrefetch(1);
        prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
    }

    protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
        return ((ActiveMQConnection)this.connection).getPrefetchPolicy();
    }

    public void testMessageListener() throws Exception {
        for (int i = 0; i < 5; ++i) {
            this.producer.send((Message)this.session.createTextMessage(MESSAGE_TEXT + i));
        }
        this.commitTx();
        this.consumer.setMessageListener((MessageListener)this);
        this.waitReceiveUnack();
        JmsTransactionTestSupport.assertEquals((int)this.unackMessages.size(), (int)5);
        this.waitReceiveAck();
        JmsTransactionTestSupport.assertEquals((int)this.ackMessages.size(), (int)5);
        this.consumer.setMessageListener(null);
        JmsTransactionTestSupport.assertNull((Object)this.consumer.receive(500L));
        this.reconnect();
    }

    public void onMessage(Message message) {
        if (!this.resendPhase) {
            this.unackMessages.add(message);
            if (this.unackMessages.size() == 5) {
                try {
                    this.rollbackTx();
                    this.resendPhase = true;
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } else {
            this.ackMessages.add(message);
            if (this.ackMessages.size() == 5) {
                try {
                    this.commitTx();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private void waitReceiveUnack() throws Exception {
        for (int i = 0; i < 100 && !this.resendPhase; ++i) {
            Thread.sleep(100L);
        }
        JmsTransactionTestSupport.assertTrue((boolean)this.resendPhase);
    }

    private void waitReceiveAck() throws Exception {
        for (int i = 0; i < 100 && this.ackMessages.size() < 5; ++i) {
            Thread.sleep(100L);
        }
        JmsTransactionTestSupport.assertFalse((this.ackMessages.size() < 5 ? 1 : 0) != 0);
    }
}

