package org.hornetq.core.server.cluster.impl;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SendAcknowledgementHandler;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.Transformer;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUID;

/* loaded from: input_file:org/hornetq/core/server/cluster/impl/BridgeImpl.class */
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler {
    private static final Logger log = Logger.getLogger(BridgeImpl.class);
    private static final boolean isTrace = log.isTraceEnabled();
    private static final SimpleString JMS_QUEUE_ADDRESS_PREFIX = new SimpleString(ResourceNames.JMS_QUEUE);
    private static final SimpleString JMS_TOPIC_ADDRESS_PREFIX = new SimpleString(ResourceNames.JMS_TOPIC);
    protected final ServerLocatorInternal serverLocator;
    private final UUID nodeUUID;
    private final SimpleString name;
    private final Queue queue;
    protected final Executor executor;
    private final Filter filter;
    private final SimpleString forwardingAddress;
    private final java.util.Queue<MessageReference> refs = new ConcurrentLinkedQueue();
    private final Transformer transformer;
    private volatile ClientSessionFactory csf;
    protected volatile ClientSessionInternal session;
    private volatile ClientProducer producer;
    private volatile boolean started;
    private final boolean useDuplicateDetection;
    private volatile boolean active;
    private volatile boolean stopping;
    private final String user;
    private final String password;
    private boolean activated;
    private NotificationService notificationService;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/hornetq/core/server/cluster/impl/BridgeImpl$CreateObjectsRunnable.class */
    public class CreateObjectsRunnable implements Runnable {
        private CreateObjectsRunnable() {
        }

        @Override // java.lang.Runnable
        public synchronized void run() {
            if (BridgeImpl.this.createObjects()) {
                return;
            }
            BridgeImpl.this.active = false;
            BridgeImpl.this.started = false;
        }
    }

    /* loaded from: input_file:org/hornetq/core/server/cluster/impl/BridgeImpl$StopRunnable.class */
    private class StopRunnable implements Runnable {
        private StopRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                synchronized (BridgeImpl.this) {
                    if (BridgeImpl.this.started) {
                        BridgeImpl.log.debug("Closing Session for bridge " + ((Object) BridgeImpl.this.name));
                        if (BridgeImpl.this.session != null) {
                            BridgeImpl.this.session.close();
                        }
                        BridgeImpl.this.started = false;
                        BridgeImpl.this.active = false;
                        BridgeImpl.this.queue.removeConsumer(BridgeImpl.this);
                        BridgeImpl.this.cancelRefs();
                        if (BridgeImpl.this.queue != null) {
                            BridgeImpl.this.queue.deliverAsync();
                        }
                        BridgeImpl.log.info("stopped bridge " + ((Object) BridgeImpl.this.name));
                    }
                }
            } catch (Exception e) {
                BridgeImpl.log.error("Failed to stop bridge", e);
            }
        }
    }

    public BridgeImpl(ServerLocatorInternal serverLocatorInternal, UUID uuid, SimpleString simpleString, Queue queue, Executor executor, SimpleString simpleString2, SimpleString simpleString3, ScheduledExecutorService scheduledExecutorService, Transformer transformer, boolean z, String str, String str2, boolean z2, StorageManager storageManager) throws Exception {
        this.serverLocator = serverLocatorInternal;
        this.nodeUUID = uuid;
        this.name = simpleString;
        this.queue = queue;
        this.executor = executor;
        this.filter = FilterImpl.createFilter(simpleString2);
        this.forwardingAddress = simpleString3;
        this.transformer = transformer;
        this.useDuplicateDetection = z;
        this.user = str;
        this.password = str2;
        this.activated = z2;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public void setNotificationService(NotificationService notificationService) {
        this.notificationService = notificationService;
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public synchronized void start() throws Exception {
        if (this.started) {
            return;
        }
        this.started = true;
        if (this.activated) {
            activate();
        }
        if (this.notificationService != null) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(new SimpleString("name"), this.name);
            this.notificationService.sendNotification(new Notification(this.nodeUUID.toString(), NotificationType.BRIDGE_STARTED, typedProperties));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelRefs() throws Exception {
        LinkedList linkedList = new LinkedList();
        while (true) {
            MessageReference poll = this.refs.poll();
            if (poll == null) {
                break;
            } else {
                linkedList.addFirst(poll);
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            MessageReference messageReference = (MessageReference) it.next();
            messageReference.getQueue().cancel(messageReference, currentTimeMillis);
        }
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public void stop() throws Exception {
        if (this.started && this.csf != null) {
            this.csf.close();
        }
        log.info("Bridge " + ((Object) this.name) + " being stopped");
        this.stopping = true;
        this.executor.execute(new StopRunnable());
        waitForRunnablesToComplete();
        if (this.notificationService != null) {
            TypedProperties typedProperties = new TypedProperties();
            typedProperties.putSimpleStringProperty(new SimpleString("name"), this.name);
            try {
                this.notificationService.sendNotification(new Notification(this.nodeUUID.toString(), NotificationType.BRIDGE_STOPPED, typedProperties));
            } catch (Exception e) {
                log.warn("unable to send notification when broadcast group is stopped", e);
            }
        }
    }

    @Override // org.hornetq.core.server.HornetQComponent
    public boolean isStarted() {
        return this.started;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public synchronized void activate() {
        this.activated = true;
        this.executor.execute(new CreateObjectsRunnable());
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public SimpleString getName() {
        return this.name;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public Queue getQueue() {
        return this.queue;
    }

    @Override // org.hornetq.core.server.Consumer
    public Filter getFilter() {
        return this.filter;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public SimpleString getForwardingAddress() {
        return this.forwardingAddress;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public Transformer getTransformer() {
        return this.transformer;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public boolean isUseDuplicateDetection() {
        return this.useDuplicateDetection;
    }

    @Override // org.hornetq.core.server.cluster.Bridge
    public RemotingConnection getForwardingConnection() {
        if (this.session == null) {
            return null;
        }
        return this.session.getConnection();
    }

    @Override // org.hornetq.api.core.client.SendAcknowledgementHandler
    public void sendAcknowledged(Message message) {
        try {
            MessageReference poll = this.refs.poll();
            if (poll != null) {
                poll.getQueue().acknowledge(poll);
            }
        } catch (Exception e) {
            log.error("Failed to ack", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ServerMessage beforeForward(ServerMessage serverMessage) {
        if (this.useDuplicateDetection) {
            serverMessage.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, getDuplicateBytes(this.nodeUUID, serverMessage.getMessageID()));
        }
        if (this.transformer != null) {
            serverMessage = this.transformer.transform(serverMessage);
        }
        return serverMessage;
    }

    public static byte[] getDuplicateBytes(UUID uuid, long j) {
        byte[] bArr = new byte[24];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        wrap.put(uuid.asBytes());
        wrap.putLong(j);
        return bArr;
    }

    @Override // org.hornetq.core.server.Consumer
    public HandleStatus handle(MessageReference messageReference) throws Exception {
        if (this.filter != null && !this.filter.match(messageReference.getMessage())) {
            return HandleStatus.NO_MATCH;
        }
        synchronized (this) {
            if (!this.active) {
                log.debug(((Object) this.name) + "::Ignoring reference on bridge as it is set to iniactive ref=" + messageReference);
                return HandleStatus.BUSY;
            }
            if (isTrace) {
                log.trace("Bridge " + ((Object) this.name) + " is handling reference=" + messageReference);
            }
            messageReference.handled();
            ServerMessage message = messageReference.getMessage();
            this.refs.add(messageReference);
            ServerMessage beforeForward = beforeForward(message);
            try {
                this.producer.send(this.forwardingAddress != null ? this.forwardingAddress : beforeForward.getAddress(), beforeForward);
                return HandleStatus.HANDLED;
            } catch (HornetQException e) {
                log.warn("Unable to send message, will try again once bridge reconnects", e);
                this.refs.remove(messageReference);
                return HandleStatus.BUSY;
            }
        }
    }

    @Override // org.hornetq.core.remoting.FailureListener
    public void connectionFailed(HornetQException hornetQException, boolean z) {
        log.warn(((Object) this.name) + "::Connection failed with failedOver=" + z, hornetQException);
        fail(false);
    }

    @Override // org.hornetq.api.core.client.SessionFailureListener
    public void beforeReconnect(HornetQException hornetQException) {
        log.warn(((Object) this.name) + "::Connection failed before reconnect ", hornetQException);
        fail(true);
    }

    private void waitForRunnablesToComplete() {
        Future future = new Future();
        this.executor.execute(future);
        if (future.await(10000L)) {
            return;
        }
        log.warn("Timed out waiting to stop");
    }

    private void fail(boolean z) {
        log.debug(((Object) this.name) + "::BridgeImpl::fail being called, beforeReconnect=" + z);
        if (this.session.getConnection().isDestroyed()) {
            log.debug(((Object) this.name) + "::Connection is destroyed, active = false now");
            this.active = false;
        }
        try {
            if (!this.session.getConnection().isDestroyed()) {
                if (z) {
                    synchronized (this) {
                        log.debug(((Object) this.name) + "::Connection is destroyed, active = false now");
                    }
                    cancelRefs();
                } else {
                    afterConnect();
                    log.debug(((Object) this.name) + "::After reconnect, setting active=true now");
                    this.active = true;
                    if (this.queue != null) {
                        this.queue.deliverAsync();
                    }
                }
            }
        } catch (Exception e) {
            log.error("Failed to cancel refs", e);
        }
    }

    protected void afterConnect() throws Exception {
    }

    protected ClientSessionFactory createSessionFactory() throws Exception {
        return this.serverLocator.createSessionFactory();
    }

    protected synchronized boolean createObjects() {
        boolean z;
        ClientSession.BindingQuery bindingQuery;
        if (!this.started) {
            return false;
        }
        int i = 0;
        do {
            log.info("Connecting bridge " + ((Object) this.name) + " to its destination [" + this.nodeUUID.toString() + "]");
            try {
                this.csf = createSessionFactory();
                this.session = (ClientSessionInternal) this.csf.createSession(this.user, this.password, false, true, true, true, 1);
            } catch (HornetQException e) {
                if (this.csf != null) {
                    this.csf.close();
                }
                if (e.getCode() != 112) {
                    log.warn("Bridge " + ((Object) this.name) + " is unable to connect to destination. It will be disabled.", e);
                    return false;
                }
                log.warn("Server is starting, retry to create the session for bridge " + ((Object) this.name));
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e2) {
                }
                z = true;
            } catch (Exception e3) {
                log.warn("Bridge " + ((Object) this.name) + " is unable to connect to destination. It will be disabled.", e3);
                return false;
            }
            if (this.forwardingAddress != null) {
                try {
                    bindingQuery = this.session.bindingQuery(this.forwardingAddress);
                } catch (Throwable th) {
                    log.warn("Error on querying binding. Retrying", th);
                    z = true;
                    Thread.sleep(100L);
                }
                if (this.forwardingAddress.startsWith(JMS_QUEUE_ADDRESS_PREFIX) || this.forwardingAddress.startsWith(JMS_TOPIC_ADDRESS_PREFIX)) {
                    if (!bindingQuery.isExists()) {
                        i++;
                        if (this.serverLocator.getReconnectAttempts() > 0 && i > this.serverLocator.getReconnectAttempts()) {
                            log.warn("Retried " + ((Object) this.forwardingAddress) + " up to the configured reconnectAttempts(" + this.serverLocator.getReconnectAttempts() + "). Giving up now. The bridge " + ((Object) getName()) + " will not be activated");
                            return false;
                        }
                        log.warn("Address " + ((Object) this.forwardingAddress) + " doesn't have any bindings yet, retry #(" + i + ")");
                        Thread.sleep(this.serverLocator.getRetryInterval());
                        z = true;
                        this.csf.close();
                        this.session.close();
                        if (!z) {
                            return false;
                        }
                    }
                } else if (!bindingQuery.isExists()) {
                    log.info("Bridge " + ((Object) getName()) + " connected to fowardingAddress=" + ((Object) getForwardingAddress()) + ". " + ((Object) getForwardingAddress()) + " doesn't have any bindings what means messages will be ignored until a binding is created.");
                }
            }
            if (this.session == null) {
                return false;
            }
            this.producer = this.session.createProducer();
            this.session.addFailureListener(this);
            this.session.setSendAcknowledgementHandler(this);
            afterConnect();
            this.active = true;
            this.queue.addConsumer(this);
            this.queue.deliverAsync();
            log.info("Bridge " + ((Object) this.name) + " is connected [" + this.nodeUUID + "-> " + ((Object) this.name) + "]");
            return true;
        } while (!this.stopping);
        return false;
    }
}
