/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.server.federation;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
import org.apache.activemq.artemis.core.config.FederationConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationStreamConfiguration;
import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.ChannelImpl;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationDownstreamConnectMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.FederationStreamConnectMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.federation.AbstractFederationStream;
import org.apache.activemq.artemis.core.server.federation.FederatedQueueConsumer;
import org.apache.activemq.artemis.core.server.federation.Federation;
import org.apache.activemq.artemis.core.server.federation.FederationConnection;

public class FederationDownstream
extends AbstractFederationStream
implements SessionFailureListener {
    private FederationDownstreamConfiguration config;
    private ClientSessionFactoryInternal clientSessionFactory;
    private ClientSessionInternal clientSession;
    private Channel channel;
    private AtomicBoolean initialized = new AtomicBoolean();
    private final ScheduledExecutorService scheduledExecutorService;
    private final int intialConnectDelayMultiplier = 2;
    private final int intialConnectDelayMax = 30;
    public static final String FEDERATION_DOWNSTREAM_NAME = "federation-downstream-name";
    private AtomicBoolean started = new AtomicBoolean();
    private FederationConfiguration federationConfiguration;

    public FederationDownstream(ActiveMQServer server, Federation federation, String name, FederationDownstreamConfiguration config, FederationConnection connection) {
        super(server, federation, name, (FederationStreamConfiguration)config, connection);
        this.config = config;
        this.scheduledExecutorService = server.getScheduledPool();
    }

    @Override
    public synchronized void start() {
        super.start();
        this.callFederationStreamStartedPlugins();
        try {
            this.deploy(this.federationConfiguration);
        }
        catch (ActiveMQException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    public synchronized void stop() {
        super.stop();
        this.callFederationStreamStoppedPlugins();
    }

    public void deploy(FederationConfiguration federationConfiguration) throws ActiveMQException {
        this.federationConfiguration = federationConfiguration;
        if (this.connection.isStarted() && this.started.compareAndSet(false, true)) {
            FederationDownstreamConnectMessage message = new FederationDownstreamConnectMessage();
            message.setName(federationConfiguration.getName());
            message.setCredentials(federationConfiguration.getCredentials());
            message.setStreamConfiguration((FederationStreamConfiguration)this.config);
            message.setFederationPolicyMap(federationConfiguration.getFederationPolicyMap());
            message.setTransformerConfigurationMap(federationConfiguration.getTransformerConfigurationMap());
            if (this.config.getUpstreamConfigurationRef() != null && this.config.getUpstreamConfiguration() == null) {
                TransportConfiguration[] configs = this.server.getConfiguration().getTransportConfigurations(this.config.getUpstreamConfigurationRef());
                if (configs != null && configs.length > 0) {
                    this.config.setUpstreamConfiguration(configs[0]);
                } else {
                    ActiveMQServerLogger.LOGGER.federationCantFindUpstreamConnector(this.config.getName(), this.config.getUpstreamConfigurationRef());
                    throw new ActiveMQException("Could not locate upstream transport configuration for federation downstream: " + this.config.getName() + "; upstream ref: " + this.config.getUpstreamConfigurationRef());
                }
            }
            try {
                this.scheduleConnect(0, (FederationStreamConnectMessage)message);
            }
            catch (Exception e) {
                throw new ActiveMQException(e.getMessage(), (Throwable)e, ActiveMQExceptionType.GENERIC_EXCEPTION);
            }
            ActiveMQServerLogger.LOGGER.federationDownstreamDeployed(this.config.getName());
        }
    }

    public void undeploy() {
        try {
            if (this.started.compareAndSet(true, false)) {
                this.disconnect();
                ActiveMQServerLogger.LOGGER.federationDownstreamUnDeployed(this.config.getName());
            }
        }
        catch (ActiveMQException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    private void scheduleConnect(int delay, FederationStreamConnectMessage message) {
        this.scheduledExecutorService.schedule(() -> {
            try {
                this.connect();
                if (this.initialized.compareAndSet(false, true)) {
                    this.channel.send((Packet)message);
                }
            }
            catch (Exception e) {
                this.scheduleConnect(FederatedQueueConsumer.getNextDelay(delay, 2, 30), message);
            }
        }, (long)delay, TimeUnit.SECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void connect() throws Exception {
        block8: {
            try {
                if (this.clientSession != null) break block8;
                FederationDownstream federationDownstream = this;
                synchronized (federationDownstream) {
                    this.clientSessionFactory = (ClientSessionFactoryInternal)this.getConnection().clientSessionFactory();
                    this.clientSession = (ClientSessionInternal)this.clientSessionFactory.createSession(this.getUser(), this.getPassword(), false, true, true, this.clientSessionFactory.getServerLocator().isPreAcknowledge(), this.clientSessionFactory.getServerLocator().getAckBatchSize());
                    this.clientSession.addFailureListener((SessionFailureListener)this);
                    this.clientSession.addMetaData("federation-name", this.federation.getName().toString());
                    this.clientSession.addMetaData(FEDERATION_DOWNSTREAM_NAME, this.config.getName().toString());
                    this.clientSession.start();
                    CoreRemotingConnection connection = (CoreRemotingConnection)this.clientSessionFactory.getConnection();
                    this.channel = connection.getChannel(ChannelImpl.CHANNEL_ID.FEDERATION.id, -1);
                }
            }
            catch (Exception e) {
                try {
                    if (this.clientSessionFactory != null) {
                        this.clientSessionFactory.cleanup();
                    }
                    this.disconnect();
                }
                catch (ActiveMQException activeMQException) {
                    // empty catch block
                }
                throw e;
            }
        }
    }

    public void connectionFailed(ActiveMQException exception, boolean failedOver) {
        this.connectionFailed(exception, failedOver, null);
    }

    public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
        try {
            this.started.set(false);
            this.initialized.set(false);
            this.channel.close();
            this.clientSessionFactory.cleanup();
            this.clientSessionFactory.close();
            this.channel = null;
            this.clientSession = null;
            this.clientSessionFactory = null;
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        this.start();
    }

    private void disconnect() throws ActiveMQException {
        this.initialized.set(false);
        if (this.channel != null) {
            this.channel.close();
        }
        if (this.clientSession != null) {
            this.clientSession.close();
        }
        this.channel = null;
        this.clientSession = null;
        if (!(this.clientSessionFactory == null || this.getConnection().isSharedConnection() && this.clientSessionFactory.numSessions() != 0)) {
            this.clientSessionFactory.close();
            this.clientSessionFactory = null;
        }
    }

    public void beforeReconnect(ActiveMQException exception) {
    }
}

