/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.replication;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.impl.JournalFile;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.persistence.impl.journal.AbstractJournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeginMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageEndMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationResponseMessageV2;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;

public final class ReplicationManager
implements ActiveMQComponent {
    private static final Logger logger = Logger.getLogger(ReplicationManager.class);
    private final ResponseHandler responseHandler = new ResponseHandler();
    private final Channel replicatingChannel;
    private boolean started;
    private volatile boolean enabled;
    private final AtomicBoolean writable = new AtomicBoolean(true);
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>();
    private final ExecutorFactory executorFactory;
    private final Executor replicationStream;
    private SessionFailureListener failureListener;
    private CoreRemotingConnection remotingConnection;
    private final long timeout;
    private final long initialReplicationSyncTimeout;
    private volatile boolean inSync = true;
    private final ReusableLatch synchronizationIsFinishedAcknowledgement = new ReusableLatch(0);

    public ReplicationManager(CoreRemotingConnection remotingConnection, long timeout, long initialReplicationSyncTimeout, ExecutorFactory executorFactory) {
        this.executorFactory = executorFactory;
        this.initialReplicationSyncTimeout = initialReplicationSyncTimeout;
        this.replicatingChannel = remotingConnection.getChannel(ChannelImpl.CHANNEL_ID.REPLICATION.id, -1);
        this.remotingConnection = remotingConnection;
        this.replicationStream = executorFactory.getExecutor();
        this.timeout = timeout;
    }

    public void appendUpdateRecord(byte journalID, ADD_OPERATION_TYPE operation, long id, byte recordType, Persister persister, Object record) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationAddMessage(journalID, operation, id, recordType, persister, record));
        }
    }

    public void appendDeleteRecord(byte journalID, long id) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationDeleteMessage(journalID, id));
        }
    }

    public void appendAddRecordTransactional(byte journalID, ADD_OPERATION_TYPE operation, long txID, long id, byte recordType, Persister persister, Object record) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationAddTXMessage(journalID, operation, txID, id, recordType, persister, record));
        }
    }

    public void appendCommitRecord(byte journalID, long txID, boolean sync, boolean lineUp) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationCommitMessage(journalID, false, txID), lineUp);
        }
    }

    public void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationDeleteTXMessage(journalID, txID, id, record));
        }
    }

    public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
        }
    }

    public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationPrepareMessage(journalID, txID, transactionData));
        }
    }

    public void appendRollbackRecord(byte journalID, long txID) throws Exception {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationCommitMessage(journalID, true, txID));
        }
    }

    public void pageClosed(SimpleString storeName, int pageNumber) {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationPageEventMessage(storeName, pageNumber, false));
        }
    }

    public void pageDeleted(SimpleString storeName, int pageNumber) {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationPageEventMessage(storeName, pageNumber, true));
        }
    }

    public void pageWrite(PagedMessage message, int pageNumber) {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationPageWriteMessage(message, pageNumber));
        }
    }

    public void largeMessageBegin(long messageId) {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationLargeMessageBeginMessage(messageId));
        }
    }

    public void largeMessageDelete(Long messageId, JournalStorageManager storageManager) {
        if (this.enabled) {
            long pendingRecordID = storageManager.generateID();
            this.sendReplicatePacket((Packet)new ReplicationLargeMessageEndMessage(messageId, pendingRecordID));
        }
    }

    public void largeMessageWrite(long messageId, byte[] body) {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationLargeMessageWriteMessage(messageId, body));
        }
    }

    public synchronized boolean isStarted() {
        return this.started;
    }

    public synchronized void start() throws ActiveMQException {
        if (this.started) {
            throw new IllegalStateException("ReplicationManager is already started");
        }
        this.replicatingChannel.setHandler((ChannelHandler)this.responseHandler);
        this.failureListener = new ReplicatedSessionFailureListener();
        this.remotingConnection.addFailureListener((FailureListener)this.failureListener);
        this.started = true;
        this.enabled = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws Exception {
        ReplicationManager replicationManager = this;
        synchronized (replicationManager) {
            if (!this.started) {
                logger.trace((Object)"Stopping being ignored as it hasn't been started");
                return;
            }
        }
        if (this.replicatingChannel != null) {
            this.replicatingChannel.close();
            this.replicatingChannel.getConnection().getTransportConnection().fireReady(true);
        }
        this.enabled = false;
        this.writable.set(true);
        this.clearReplicationTokens();
        CoreRemotingConnection toStop = this.remotingConnection;
        if (toStop != null) {
            toStop.removeFailureListener((FailureListener)this.failureListener);
        }
        this.remotingConnection = null;
        this.started = false;
    }

    public void clearReplicationTokens() {
        logger.trace((Object)"clearReplicationTokens initiating");
        while (!this.pendingTokens.isEmpty()) {
            OperationContext ctx = this.pendingTokens.poll();
            logger.trace((Object)"Calling ctx.replicationDone()");
            try {
                ctx.replicationDone();
            }
            catch (Throwable e) {
                ActiveMQServerLogger.LOGGER.errorCompletingCallbackOnReplicationManager(e);
            }
        }
        logger.trace((Object)"clearReplicationTokens finished");
    }

    public Set<OperationContext> getActiveTokens() {
        LinkedHashSet<OperationContext> activeContexts = new LinkedHashSet<OperationContext>();
        for (OperationContext ctx : this.pendingTokens) {
            activeContexts.add(ctx);
        }
        return activeContexts;
    }

    private OperationContext sendReplicatePacket(Packet packet) {
        return this.sendReplicatePacket(packet, true);
    }

    private OperationContext sendReplicatePacket(Packet packet, boolean lineUp) {
        if (!this.enabled) {
            packet.release();
            return null;
        }
        OperationContext repliToken = OperationContextImpl.getContext(this.executorFactory);
        if (lineUp) {
            repliToken.replicationLineUp();
        }
        this.replicationStream.execute(() -> {
            if (this.enabled) {
                this.pendingTokens.add(repliToken);
                this.flowControl(packet.expectedEncodeSize());
                this.replicatingChannel.send(packet);
            } else {
                packet.release();
                repliToken.replicationDone();
            }
        });
        return repliToken;
    }

    private boolean flowControl(int size) {
        boolean flowWorked = this.replicatingChannel.getConnection().blockUntilWritable(size, this.timeout);
        if (!flowWorked) {
            try {
                ActiveMQServerLogger.LOGGER.slowReplicationResponse();
                this.stop();
            }
            catch (Exception e) {
                logger.warn((Object)e.getMessage(), (Throwable)e);
            }
        }
        return flowWorked;
    }

    private void replicated() {
        OperationContext ctx = this.pendingTokens.poll();
        if (ctx == null) {
            logger.warn((Object)"Missing replication token on queue");
            return;
        }
        ctx.replicationDone();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void syncJournalFile(JournalFile jf, AbstractJournalStorageManager.JournalContent content) throws Exception {
        if (!this.enabled) {
            return;
        }
        SequentialFile file = jf.getFile().cloneFile();
        try {
            ActiveMQServerLogger.LOGGER.replicaSyncFile(file, file.size());
            this.sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
        }
        finally {
            if (file.isOpen()) {
                file.close();
            }
        }
    }

    public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception {
        if (this.enabled) {
            this.sendLargeFile(null, null, id, file, size);
        }
    }

    public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception {
        if (this.enabled) {
            this.sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void sendLargeFile(AbstractJournalStorageManager.JournalContent content, SimpleString pageStore, long id, SequentialFile file, long maxBytesToSend) throws Exception {
        if (!this.enabled) {
            return;
        }
        if (!file.isOpen()) {
            file.open();
        }
        int size = 32768;
        int flowControlSize = 10;
        int packetsSent = 0;
        FlushAction action = new FlushAction();
        try {
            try (FileInputStream fis = new FileInputStream(file.getJavaFile());
                 FileChannel channel = fis.getChannel();){
                int bytesRead;
                do {
                    ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size, size);
                    buffer.clear();
                    ByteBuffer byteBuffer = buffer.writerIndex(size).readerIndex(0).nioBuffer();
                    int toSend = bytesRead = channel.read(byteBuffer);
                    if (bytesRead > 0) {
                        if ((long)bytesRead >= maxBytesToSend) {
                            toSend = (int)maxBytesToSend;
                            maxBytesToSend = 0L;
                        } else {
                            maxBytesToSend -= (long)bytesRead;
                        }
                    }
                    logger.debug((Object)("sending " + buffer.writerIndex() + " bytes on file " + file.getFileName()));
                    this.sendReplicatePacket((Packet)new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer), true);
                    if (++packetsSent % flowControlSize != 0) continue;
                    this.flushReplicationStream(action);
                } while (bytesRead != -1 && bytesRead != 0 && maxBytesToSend != 0L);
            }
            this.flushReplicationStream(action);
            return;
        }
        finally {
            if (file.isOpen()) {
                file.close();
            }
        }
    }

    private void flushReplicationStream(FlushAction action) throws Exception {
        action.reset();
        this.replicationStream.execute(action);
        if (!action.await(this.timeout, TimeUnit.MILLISECONDS)) {
            throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(this.initialReplicationSyncTimeout);
        }
    }

    public void sendStartSyncMessage(JournalFile[] datafiles, AbstractJournalStorageManager.JournalContent contentType, String nodeID, boolean allowsAutoFailBack) throws ActiveMQException {
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationStartSyncMessage(datafiles, contentType, nodeID, allowsAutoFailBack));
        }
    }

    public void sendSynchronizationDone(String nodeID, long initialReplicationSyncTimeout) {
        if (this.enabled) {
            if (logger.isTraceEnabled()) {
                logger.trace((Object)("sendSynchronizationDone ::" + nodeID + ", " + initialReplicationSyncTimeout));
            }
            this.synchronizationIsFinishedAcknowledgement.countUp();
            this.sendReplicatePacket((Packet)new ReplicationStartSyncMessage(nodeID));
            try {
                if (!this.synchronizationIsFinishedAcknowledgement.await(initialReplicationSyncTimeout)) {
                    logger.trace((Object)"sendSynchronizationDone wasn't finished in time");
                    throw ActiveMQMessageBundle.BUNDLE.replicationSynchronizationTimeout(initialReplicationSyncTimeout);
                }
            }
            catch (InterruptedException e) {
                logger.debug((Object)e);
            }
            this.inSync = false;
            logger.trace((Object)"sendSynchronizationDone finished");
        }
    }

    public void sendLargeMessageIdListMessage(Map<Long, Pair<String, Long>> largeMessages) {
        ArrayList<Long> idsToSend = new ArrayList<Long>(largeMessages.keySet());
        if (this.enabled) {
            this.sendReplicatePacket((Packet)new ReplicationStartSyncMessage(idsToSend));
        }
    }

    public OperationContext sendLiveIsStopping(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) {
        logger.debug((Object)("LIVE IS STOPPING?!? message=" + (Object)((Object)finalMessage) + " enabled=" + this.enabled));
        if (this.enabled) {
            logger.debug((Object)("LIVE IS STOPPING?!? message=" + (Object)((Object)finalMessage) + " " + this.enabled));
            return this.sendReplicatePacket((Packet)new ReplicationLiveIsStoppingMessage(finalMessage));
        }
        return null;
    }

    public CoreRemotingConnection getBackupTransportConnection() {
        return this.remotingConnection;
    }

    public boolean isSynchronizing() {
        return this.inSync;
    }

    private class FlushAction
    implements Runnable {
        ReusableLatch latch = new ReusableLatch(1);

        private FlushAction() {
        }

        public void reset() {
            this.latch.setCount(1);
        }

        public boolean await(long timeout, TimeUnit unit) throws Exception {
            return this.latch.await(timeout, unit);
        }

        @Override
        public void run() {
            this.latch.countDown();
        }
    }

    private static final class NullEncoding
    implements EncodingSupport {
        static final NullEncoding instance = new NullEncoding();

        private NullEncoding() {
        }

        public void decode(ActiveMQBuffer buffer) {
        }

        public void encode(ActiveMQBuffer buffer) {
        }

        public int getEncodeSize() {
            return 0;
        }
    }

    private final class ResponseHandler
    implements ChannelHandler {
        private ResponseHandler() {
        }

        public void handlePacket(Packet packet) {
            if (packet.getType() == 90 || packet.getType() == -9) {
                ReplicationResponseMessageV2 replicationResponseMessage;
                ReplicationManager.this.replicated();
                if (packet.getType() == -9 && (replicationResponseMessage = (ReplicationResponseMessageV2)packet).isSynchronizationIsFinishedAcknowledgement()) {
                    ReplicationManager.this.synchronizationIsFinishedAcknowledgement.countDown();
                }
            }
        }
    }

    private final class ReplicatedSessionFailureListener
    implements SessionFailureListener {
        private ReplicatedSessionFailureListener() {
        }

        public void connectionFailed(ActiveMQException me, boolean failedOver) {
            if (me.getType() == ActiveMQExceptionType.DISCONNECTED) {
                ActiveMQServerLogger.LOGGER.replicationStopOnBackupShutdown();
            } else {
                ActiveMQServerLogger.LOGGER.replicationStopOnBackupFail((Exception)((Object)me));
            }
            try {
                ReplicationManager.this.stop();
            }
            catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorStoppingReplication(e);
            }
        }

        public void connectionFailed(ActiveMQException me, boolean failedOver, String scaleDownTargetNodeID) {
            this.connectionFailed(me, failedOver);
        }

        public void beforeReconnect(ActiveMQException me) {
        }
    }

    public static enum ADD_OPERATION_TYPE {
        UPDATE{

            @Override
            public boolean toBoolean() {
                return true;
            }
        }
        ,
        ADD{

            @Override
            public boolean toBoolean() {
                return false;
            }
        };


        public abstract boolean toBoolean();

        public static ADD_OPERATION_TYPE toOperation(boolean isUpdate) {
            return isUpdate ? UPDATE : ADD;
        }
    }
}

