package org.apache.activemq.artemis.cli.commands.messages;

import io.airlift.airline.Command;
import io.airlift.airline.Option;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.InputAbstract;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;

@Command(name = "transfer", description = "Moves Messages from one destination towards another destination")
/* loaded from: input_file:org/apache/activemq/artemis/cli/commands/messages/Transfer.class */
public class Transfer extends InputAbstract {
    private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";

    @Option(name = {"--source-user"}, description = "User used to connect")
    protected String sourceUser;

    @Option(name = {"--source-password"}, description = "Password used to connect")
    protected String sourcePassword;

    @Option(name = {"--target-user"}, description = "User used to connect")
    protected String targetUser;

    @Option(name = {"--target-password"}, description = "Password used to connect")
    protected String targetPassword;

    @Option(name = {"--source-client-id"}, description = "ClientID to be associated with connection")
    String sourceClientID;

    @Option(name = {"--source-queue"}, description = "JMS Queue to be used.")
    String sourceQueue;

    @Option(name = {"--shared-durable-subscription"}, description = "Name of a shared subscription name to be used on the input topic")
    String sharedDurableSubscription;

    @Option(name = {"--shared-subscription"}, description = "Name of a shared subscription name to be used on the input topic")
    String sharedSubscription;

    @Option(name = {"--durable-consumer"}, description = "Name of a durable consumer to be used on the input topic")
    String durableConsumer;

    @Option(name = {"--no-Local"}, description = "Use noLocal when applicable on topic operation")
    boolean noLocal;

    @Option(name = {"--source-topic"}, description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
    String sourceTopic;

    @Option(name = {"--source-filter"}, description = "filter to be used with the consumer")
    String filter;

    @Option(name = {"--copy"}, description = "If this option is chosen we will perform a copy of the queue by rolling back the original TX on the source.")
    boolean copy;

    @Option(name = {"--target-queue"}, description = "JMS Queue to be used.")
    String targetQueue;

    @Option(name = {"--target-topic"}, description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
    String targetTopic;

    @Option(name = {"--source-url"}, description = "URL towards the broker. (default: Read from current broker.xml or tcp://localhost:61616 if the default cannot be parsed)")
    protected String sourceURL = DEFAULT_BROKER_URL;

    @Option(name = {"--target-url"}, description = "URL towards the broker. (default: Read from current broker.xml or tcp://localhost:61616 if the default cannot be parsed)")
    protected String targetURL = DEFAULT_BROKER_URL;

    @Option(name = {"--receive-timeout"}, description = "Amount of time (in milliseconds) to wait before giving up the loop. 0 means receiveNoWait, -1 means consumer.receive() waiting forever. (default=5000)")
    int receiveTimeout = 5000;

    @Option(name = {"--source-protocol"}, description = "Protocol used. Valid values are amqp or core. Default=core.")
    String sourceProtocol = "core";

    @Option(name = {"--target-protocol"}, description = "Protocol used. Valid values are amqp or core. Default=core.")
    String targetProtocol = "core";

    @Option(name = {"--commit-interval"}, description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
    int commitInterval = 1000;

    boolean isCopy() {
        return this.copy;
    }

    @Override // org.apache.activemq.artemis.cli.commands.InputAbstract, org.apache.activemq.artemis.cli.commands.ActionAbstract, org.apache.activemq.artemis.cli.commands.Action
    public Object execute(ActionContext actionContext) throws Exception {
        String brokerURLInstance;
        super.execute(actionContext);
        if (this.sourceURL == DEFAULT_BROKER_URL && (brokerURLInstance = getBrokerURLInstance()) != null) {
            this.sourceURL = brokerURLInstance;
        }
        System.out.println("Connection brokerURL = " + this.sourceURL);
        Connection createConnection = createConnectionFactory("source", this.sourceProtocol, this.sourceURL, this.sourceUser, this.sourcePassword, this.sourceClientID).createConnection();
        Session createSession = createConnection.createSession(0);
        Topic createDestination = createDestination("source", createSession, this.sourceQueue, this.sourceTopic);
        MessageConsumer messageConsumer = null;
        if (createDestination instanceof Queue) {
            messageConsumer = this.filter != null ? createSession.createConsumer(createDestination, this.filter) : createSession.createConsumer(createDestination);
        } else if (createDestination instanceof Topic) {
            Topic topic = createDestination;
            if (this.durableConsumer != null) {
                messageConsumer = this.filter != null ? createSession.createDurableConsumer(topic, this.durableConsumer) : createSession.createDurableConsumer(topic, this.durableConsumer, this.filter, this.noLocal);
            } else if (this.sharedDurableSubscription != null) {
                messageConsumer = this.filter != null ? createSession.createSharedDurableConsumer(topic, this.sharedDurableSubscription, this.filter) : createSession.createSharedDurableConsumer(topic, this.sharedDurableSubscription);
            } else {
                if (this.sharedSubscription == null) {
                    throw new IllegalArgumentException("you have to specify --durable-consumer, --shared-durable-subscription or --shared-subscription with a topic");
                }
                messageConsumer = this.filter != null ? createSession.createSharedConsumer(topic, this.sharedSubscription, this.filter) : createSession.createSharedConsumer(topic, this.sharedSubscription);
            }
        }
        Connection createConnection2 = createConnectionFactory("target", this.targetProtocol, this.targetURL, this.targetUser, this.targetPassword, null).createConnection();
        Session createSession2 = createConnection2.createSession(0);
        Destination createDestination2 = createDestination("target", createSession2, this.targetQueue, this.targetTopic);
        MessageProducer createProducer = createSession2.createProducer(createDestination2);
        if (this.sourceURL.equals(this.targetURL) && createDestination.equals(createDestination2)) {
            System.out.println("You cannot transfer between " + this.sourceURL + "/" + createDestination + " and " + this.targetURL + "/" + createDestination2 + ".\nThat would create an infinite recursion.");
            throw new IllegalArgumentException("cannot use " + createDestination + " == " + createDestination2);
        }
        createConnection.start();
        int i = 0;
        int i2 = 0;
        while (true) {
            Message receive = this.receiveTimeout < 0 ? messageConsumer.receive() : this.receiveTimeout == 0 ? messageConsumer.receiveNoWait() : messageConsumer.receive(this.receiveTimeout);
            if (receive == null) {
                break;
            }
            createProducer.send(receive);
            i++;
            i2++;
            if (isVerbose()) {
                System.out.println("Received message " + i2 + " with " + i + " messages pending to be commited");
            }
            if (i > this.commitInterval) {
                System.out.println("Transferred " + i + " messages of " + i2);
                i = 0;
                createSession2.commit();
                if (!isCopy()) {
                    createSession.commit();
                }
            }
        }
        if (isVerbose()) {
            System.out.println("could not receive any more messages");
        }
        System.out.println("Transferred a total of " + i2 + " messages");
        if (i != 0) {
            createSession2.commit();
            if (isCopy()) {
                createSession.rollback();
            } else {
                createSession.commit();
            }
        }
        createConnection.close();
        createConnection2.close();
        return null;
    }

    Destination createDestination(String str, Session session, String str2, String str3) throws Exception {
        if (str2 != null && str3 != null) {
            throw new IllegalArgumentException("Cannot have topic and queue passed as " + str);
        }
        if (str2 != null) {
            return session.createQueue(str2);
        }
        if (str3 != null) {
            return session.createTopic(str3);
        }
        throw new IllegalArgumentException("You need to pass either a topic or a queue as " + str);
    }

    protected ConnectionFactory createConnectionFactory(String str, String str2, String str3, String str4, String str5, String str6) throws Exception {
        if (str2.equals("core")) {
            if (isVerbose()) {
                System.out.println("Creating " + str + " CORE Connection towards " + str3);
            }
            return createCoreConnectionFactory(str3, str4, str5, str6);
        }
        if (!str2.equals("amqp")) {
            throw new IllegalStateException("protocol " + str2 + " not supported");
        }
        if (isVerbose()) {
            System.out.println("Creating " + str + " AMQP Connection towards " + str3);
        }
        return createAMQPConnectionFactory(str3, str4, str5, str6);
    }

    private ConnectionFactory createAMQPConnectionFactory(String str, String str2, String str3, String str4) {
        if (str.startsWith("tcp://")) {
            str = "amqp" + str.substring(3);
        }
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory(str2, str3, str);
        if (str4 != null) {
            jmsConnectionFactory.setClientID(str4);
        }
        try {
            jmsConnectionFactory.createConnection().close();
            return jmsConnectionFactory;
        } catch (JMSException e) {
            this.context.err.println("Connection failed::" + e.getMessage());
            String input = input("--url", "Type in the broker URL for a retry (e.g. tcp://localhost:61616)", str);
            userPassword(input);
            JmsConnectionFactory jmsConnectionFactory2 = new JmsConnectionFactory(str2, str3, input);
            if (str4 != null) {
                jmsConnectionFactory2.setClientID(str4);
            }
            return jmsConnectionFactory2;
        } catch (JMSSecurityException e2) {
            this.context.err.println("Connection failed::" + e2.getMessage());
            userPassword(str);
            JmsConnectionFactory jmsConnectionFactory3 = new JmsConnectionFactory(str2, str3, str);
            if (str4 != null) {
                jmsConnectionFactory3.setClientID(str4);
            }
            return jmsConnectionFactory3;
        }
    }

    protected ActiveMQConnectionFactory createCoreConnectionFactory(String str, String str2, String str3, String str4) {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(str, str2, str3);
        if (str4 != null) {
            System.out.println("Consumer:: clientID = " + str4);
            activeMQConnectionFactory.setClientID(str4);
        }
        try {
            activeMQConnectionFactory.createConnection().close();
            return activeMQConnectionFactory;
        } catch (JMSException e) {
            if (this.context != null) {
                this.context.err.println("Connection failed::" + e.getMessage());
            }
            String input = input("--url", "Type in the broker URL for a retry (e.g. tcp://localhost:61616)", str);
            Pair<String, String> userPassword = userPassword(input);
            ActiveMQConnectionFactory activeMQConnectionFactory2 = new ActiveMQConnectionFactory(input, (String) userPassword.getA(), (String) userPassword.getB());
            if (str4 != null) {
                activeMQConnectionFactory2.setClientID(str4);
            }
            return activeMQConnectionFactory2;
        } catch (JMSSecurityException e2) {
            if (this.context != null) {
                this.context.err.println("Connection failed::" + e2.getMessage());
            }
            Pair<String, String> userPassword2 = userPassword(str);
            ActiveMQConnectionFactory activeMQConnectionFactory3 = new ActiveMQConnectionFactory(str, (String) userPassword2.getA(), (String) userPassword2.getB());
            if (str4 != null) {
                activeMQConnectionFactory3.setClientID(str4);
            }
            return activeMQConnectionFactory3;
        }
    }

    Pair<String, String> userPassword(String str) {
        System.out.println("Type in user/password towards " + str);
        return new Pair<>(input("--user", "Type the username for a retry", null), inputPassword("--password", "Type the password for a retry", null));
    }
}
