/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.jdbc;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.IndexListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.jdbc.JDBCMessageStore;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCTopicMessageStore;
import org.apache.activemq.store.jdbc.TransactionContext;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayInputStream;

public class JdbcMemoryTransactionStore
extends MemoryTransactionStore {
    public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) {
        super(jdbcPersistenceAdapter, jdbcPersistenceAdapter.getBrokerService());
    }

    @Override
    public void prepare(TransactionId txid) throws IOException {
        MemoryTransactionStore.Tx tx = (MemoryTransactionStore.Tx)this.inflightTransactions.remove(txid);
        if (tx == null) {
            return;
        }
        ConnectionContext ctx = new ConnectionContext();
        ctx.setXid((XATransactionId)txid);
        this.persistenceAdapter.beginTransaction(ctx);
        try {
            for (MemoryTransactionStore.AddMessageCommand addMessageCommand : tx.messages) {
                addMessageCommand.run(ctx);
            }
            for (MemoryTransactionStore.RemoveMessageCommand removeMessageCommand : tx.acks) {
                removeMessageCommand.run(ctx);
            }
            this.persistenceAdapter.commitTransaction(ctx);
        }
        catch (IOException e) {
            this.persistenceAdapter.rollbackTransaction(ctx);
            throw e;
        }
        ctx.setXid(null);
        ArrayList<MemoryTransactionStore.AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<MemoryTransactionStore.AddMessageCommand>();
        for (MemoryTransactionStore.AddMessageCommand addMessageCommand : tx.messages) {
            updateFromPreparedStateCommands.add(new CommitAddOutcome(addMessageCommand));
        }
        tx.messages = updateFromPreparedStateCommands;
        this.preparedTransactions.put(txid, tx);
    }

    @Override
    public void rollback(TransactionId txid) throws IOException {
        MemoryTransactionStore.Tx tx = (MemoryTransactionStore.Tx)this.inflightTransactions.remove(txid);
        if (tx == null && (tx = (MemoryTransactionStore.Tx)this.preparedTransactions.remove(txid)) != null) {
            ConnectionContext ctx = new ConnectionContext();
            try {
                if (this.brokerService != null) {
                    ctx.setBroker(this.brokerService.getBroker());
                }
            }
            catch (Exception e) {
                throw new IOException(e.getMessage(), e);
            }
            this.persistenceAdapter.beginTransaction(ctx);
            try {
                Iterator<Object> iter = tx.messages.iterator();
                while (iter.hasNext()) {
                    Message message = iter.next().getMessage();
                    ((JDBCPersistenceAdapter)this.persistenceAdapter).commitRemove(ctx, new MessageAck(message, 2, 1));
                }
                for (MemoryTransactionStore.RemoveMessageCommand removeMessageCommand : tx.acks) {
                    if (removeMessageCommand instanceof LastAckCommand) {
                        ((LastAckCommand)removeMessageCommand).rollback(ctx);
                        continue;
                    }
                    MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId();
                    long sequence = (Long)messageId.getEntryLocator();
                    ((JDBCPersistenceAdapter)this.persistenceAdapter).commitAdd(ctx, messageId, sequence, sequence);
                    if (!(removeMessageCommand instanceof RecoveredRemoveMessageCommand)) continue;
                    ((JDBCMessageStore)removeMessageCommand.getMessageStore()).trackRollbackAck(((RecoveredRemoveMessageCommand)removeMessageCommand).getMessage());
                }
            }
            catch (IOException e) {
                this.persistenceAdapter.rollbackTransaction(ctx);
                throw e;
            }
            this.persistenceAdapter.commitTransaction(ctx);
        }
    }

    @Override
    public void recover(TransactionRecoveryListener listener) throws IOException {
        ((JDBCPersistenceAdapter)this.persistenceAdapter).recover(this);
        super.recover(listener);
    }

    public void recoverAdd(long id, byte[] messageBytes) throws IOException {
        Message message = (Message)((JDBCPersistenceAdapter)this.persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes));
        message.getMessageId().setFutureOrSequenceLong(id);
        message.getMessageId().setEntryLocator(id);
        MemoryTransactionStore.Tx tx = this.getPreparedTx(message.getTransactionId());
        tx.add(new CommitAddOutcome(null, message));
    }

    public void recoverAck(long id, byte[] xid, byte[] message) throws IOException {
        final Message msg = (Message)((JDBCPersistenceAdapter)this.persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message));
        msg.getMessageId().setFutureOrSequenceLong(id);
        msg.getMessageId().setEntryLocator(id);
        MemoryTransactionStore.Tx tx = this.getPreparedTx(new XATransactionId(xid));
        final MessageAck ack = new MessageAck(msg, 2, 1);
        tx.add(new RecoveredRemoveMessageCommand(){
            MessageStore messageStore = null;

            @Override
            public MessageAck getMessageAck() {
                return ack;
            }

            @Override
            public void run(ConnectionContext context) throws IOException {
                ((JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter).commitRemove(context, ack);
            }

            @Override
            public Message getMessage() {
                return msg;
            }

            @Override
            public void setMessageStore(MessageStore messageStore) {
                this.messageStore = messageStore;
            }

            @Override
            public MessageStore getMessageStore() {
                return this.messageStore;
            }
        });
    }

    public void recoverLastAck(byte[] encodedXid, final ActiveMQDestination destination, final String subName, final String clientId) throws IOException {
        MemoryTransactionStore.Tx tx = this.getPreparedTx(new XATransactionId(encodedXid));
        DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid);
        inputStream.skipBytes(1);
        final long lastAck = inputStream.readLong();
        final byte priority = inputStream.readByte();
        final MessageAck ack = new MessageAck();
        ack.setDestination(destination);
        tx.add(new LastAckCommand(){
            JDBCTopicMessageStore jdbcTopicMessageStore;

            @Override
            public MessageAck getMessageAck() {
                return ack;
            }

            @Override
            public MessageStore getMessageStore() {
                return this.jdbcTopicMessageStore;
            }

            @Override
            public void run(ConnectionContext context) throws IOException {
                ((JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId);
                this.jdbcTopicMessageStore.complete(clientId, subName);
            }

            @Override
            public void rollback(ConnectionContext context) throws IOException {
                ((JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter).rollbackLastAck(context, this.jdbcTopicMessageStore.isPrioritizedMessages() ? priority : (byte)0, this.jdbcTopicMessageStore.getDestination(), subName, clientId);
                this.jdbcTopicMessageStore.complete(clientId, subName);
            }

            @Override
            public String getClientId() {
                return clientId;
            }

            @Override
            public String getSubName() {
                return subName;
            }

            @Override
            public long getSequence() {
                return lastAck;
            }

            @Override
            public byte getPriority() {
                return priority;
            }

            @Override
            public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
                this.jdbcTopicMessageStore = jdbcTopicMessageStore;
            }
        });
    }

    @Override
    protected void onRecovered(MemoryTransactionStore.Tx tx) {
        for (MemoryTransactionStore.RemoveMessageCommand removeMessageCommand : tx.acks) {
            if (removeMessageCommand instanceof LastAckCommand) {
                LastAckCommand lastAckCommand = (LastAckCommand)removeMessageCommand;
                JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)this.findMessageStore(lastAckCommand.getMessageAck().getDestination());
                jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority());
                lastAckCommand.setMessageStore(jdbcTopicMessageStore);
                continue;
            }
            ((RecoveredRemoveMessageCommand)removeMessageCommand).setMessageStore(this.findMessageStore(removeMessageCommand.getMessageAck().getDestination()));
        }
        for (MemoryTransactionStore.AddMessageCommand addMessageCommand : tx.messages) {
            addMessageCommand.setMessageStore(this.findMessageStore(addMessageCommand.getMessage().getDestination()));
        }
    }

    private MessageStore findMessageStore(ActiveMQDestination destination) {
        ProxyMessageStore proxyMessageStore = null;
        try {
            proxyMessageStore = destination.isQueue() ? (ProxyMessageStore)this.persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination) : (ProxyMessageStore)((Object)this.persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination));
        }
        catch (IOException error) {
            throw new RuntimeException("Failed to find/create message store for destination: " + destination, error);
        }
        return proxyMessageStore.getDelegate();
    }

    @Override
    public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName, final MessageId messageId, final MessageAck ack) throws IOException {
        if (ack.isInTransaction()) {
            MemoryTransactionStore.Tx tx = this.getTx(ack.getTransactionId());
            tx.add(new LastAckCommand(){

                @Override
                public MessageAck getMessageAck() {
                    return ack;
                }

                @Override
                public void run(ConnectionContext ctx) throws IOException {
                    topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
                }

                @Override
                public MessageStore getMessageStore() {
                    return topicMessageStore;
                }

                @Override
                public void rollback(ConnectionContext context) throws IOException {
                    JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore;
                    ((JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter).rollbackLastAck(context, jdbcTopicMessageStore, ack, subscriptionName, clientId);
                    jdbcTopicMessageStore.complete(clientId, subscriptionName);
                }

                @Override
                public String getClientId() {
                    return clientId;
                }

                @Override
                public String getSubName() {
                    return subscriptionName;
                }

                @Override
                public long getSequence() {
                    throw new IllegalStateException("Sequence id must be inferred from ack");
                }

                @Override
                public byte getPriority() {
                    throw new IllegalStateException("Priority must be inferred from ack or row");
                }

                @Override
                public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) {
                    throw new IllegalStateException("message store already known!");
                }
            });
        } else {
            topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack);
        }
    }

    static interface LastAckCommand
    extends MemoryTransactionStore.RemoveMessageCommand {
        public void rollback(ConnectionContext var1) throws IOException;

        public String getClientId();

        public String getSubName();

        public long getSequence();

        public byte getPriority();

        public void setMessageStore(JDBCTopicMessageStore var1);
    }

    static interface RecoveredRemoveMessageCommand
    extends MemoryTransactionStore.RemoveMessageCommand {
        public Message getMessage();

        public void setMessageStore(MessageStore var1);
    }

    class CommitAddOutcome
    implements MemoryTransactionStore.AddMessageCommand {
        final Message message;
        JDBCMessageStore jdbcMessageStore;

        public CommitAddOutcome(JDBCMessageStore jdbcMessageStore, Message message) {
            this.jdbcMessageStore = jdbcMessageStore;
            this.message = message;
        }

        public CommitAddOutcome(MemoryTransactionStore.AddMessageCommand addMessageCommand) {
            this((JDBCMessageStore)addMessageCommand.getMessageStore(), addMessageCommand.getMessage());
        }

        @Override
        public Message getMessage() {
            return this.message;
        }

        @Override
        public MessageStore getMessageStore() {
            return this.jdbcMessageStore;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run(ConnectionContext context) throws IOException {
            long newSequence;
            JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter)JdbcMemoryTransactionStore.this.persistenceAdapter;
            Long preparedEntrySequence = (Long)this.message.getMessageId().getEntryLocator();
            TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context);
            LinkedList<Long> linkedList = this.jdbcMessageStore.pendingAdditions;
            synchronized (linkedList) {
                final long sequenceToSet = newSequence = jdbcPersistenceAdapter.getNextSequenceId();
                c.onCompletion(new Runnable(){

                    @Override
                    public void run() {
                        CommitAddOutcome.this.message.getMessageId().setEntryLocator(sequenceToSet);
                        CommitAddOutcome.this.message.getMessageId().setFutureOrSequenceLong(sequenceToSet);
                    }
                });
                if (this.jdbcMessageStore.getIndexListener() != null) {
                    this.jdbcMessageStore.getIndexListener().onAdd(new IndexListener.MessageContext(context, this.message, null));
                }
            }
            jdbcPersistenceAdapter.commitAdd(context, this.message.getMessageId(), preparedEntrySequence, newSequence);
            this.jdbcMessageStore.onAdd(this.message, (Long)this.message.getMessageId().getEntryLocator(), this.message.getPriority());
        }

        @Override
        public void setMessageStore(MessageStore messageStore) {
            this.jdbcMessageStore = (JDBCMessageStore)messageStore;
        }
    }
}

