package org.apache.activemq.artemis.cli.commands.messages;

import io.airlift.airline.Command;
import io.airlift.airline.Option;
import java.io.FileOutputStream;
import java.io.OutputStream;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;

@Command(name = "consumer", description = "Consume messages from a queue.")
/* loaded from: input_file:org/apache/activemq/artemis/cli/commands/messages/Consumer.class */
public class Consumer extends DestAbstract {

    @Option(name = {"--durable"}, description = "Whether the consumer's subscription will be durable.")
    boolean durable = false;

    @Option(name = {"--break-on-null"}, description = "Stop consuming when a null message is received.")
    boolean breakOnNull = false;

    @Option(name = {"--receive-timeout"}, description = "Timeout for receiving messages (in milliseconds).")
    int receiveTimeout = 3000;

    @Option(name = {"--filter"}, description = "The message filter.")
    String filter;

    @Option(name = {"--data"}, description = "Serialize the messages to the specified file as they are consumed.")
    String file;

    /* loaded from: input_file:org/apache/activemq/artemis/cli/commands/messages/Consumer$SerialiserMessageListener.class */
    private class SerialiserMessageListener implements MessageListener {
        private MessageSerializer messageSerializer;

        SerialiserMessageListener(MessageSerializer messageSerializer, OutputStream outputStream) throws Exception {
            this.messageSerializer = messageSerializer;
            this.messageSerializer.setOutput(outputStream);
        }

        public void onMessage(Message message) {
            this.messageSerializer.write(message);
        }
    }

    @Override // org.apache.activemq.artemis.cli.commands.messages.ConnectionAbstract, org.apache.activemq.artemis.cli.commands.InputAbstract, org.apache.activemq.artemis.cli.commands.ActionAbstract, org.apache.activemq.artemis.cli.commands.Action
    public Object execute(ActionContext actionContext) throws Exception {
        super.execute(actionContext);
        actionContext.out.println("Consumer:: filter = " + this.filter);
        SerialiserMessageListener serialiserMessageListener = null;
        MessageSerializer messageSerializer = null;
        if (this.file != null) {
            messageSerializer = getMessageSerializer();
            if (messageSerializer == null) {
                System.err.println("Error. Unable to instantiate serializer class: " + this.serializer);
                return null;
            }
            try {
                serialiserMessageListener = new SerialiserMessageListener(messageSerializer, new FileOutputStream(this.file));
                messageSerializer.start();
            } catch (Exception e) {
                System.err.println("Error: Unable to open file for writing\n" + e.getMessage());
                return null;
            }
        }
        Connection createConnection = createConnectionFactory().createConnection();
        try {
            ConsumerThread[] consumerThreadArr = new ConsumerThread[this.threads];
            for (int i = 0; i < this.threads; i++) {
                Session createSession = this.txBatchSize > 0 ? createConnection.createSession(true, 0) : createConnection.createSession(false, 1);
                consumerThreadArr[i] = new ConsumerThread(createSession, getDestination(createSession), i);
                consumerThreadArr[i].setVerbose(this.verbose).setSleep(this.sleep).setDurable(this.durable).setBatchSize(this.txBatchSize).setBreakOnNull(this.breakOnNull).setMessageCount(this.messageCount).setReceiveTimeOut(this.receiveTimeout).setFilter(this.filter).setBrowse(false).setListener(serialiserMessageListener);
            }
            for (ConsumerThread consumerThread : consumerThreadArr) {
                consumerThread.start();
            }
            createConnection.start();
            int i2 = 0;
            for (ConsumerThread consumerThread2 : consumerThreadArr) {
                consumerThread2.join();
                i2 += consumerThread2.getReceived();
            }
            if (messageSerializer != null) {
                messageSerializer.stop();
            }
            Integer valueOf = Integer.valueOf(i2);
            if (createConnection != null) {
                createConnection.close();
            }
            return valueOf;
        } catch (Throwable th) {
            if (createConnection != null) {
                try {
                    createConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public boolean isDurable() {
        return this.durable;
    }

    public Consumer setDurable(boolean z) {
        this.durable = z;
        return this;
    }

    public boolean isBreakOnNull() {
        return this.breakOnNull;
    }

    public Consumer setBreakOnNull(boolean z) {
        this.breakOnNull = z;
        return this;
    }

    public int getReceiveTimeout() {
        return this.receiveTimeout;
    }

    public Consumer setReceiveTimeout(int i) {
        this.receiveTimeout = i;
        return this;
    }

    public String getFilter() {
        return this.filter;
    }

    public Consumer setFilter(String str) {
        this.filter = str;
        return this;
    }

    public String getFile() {
        return this.file;
    }

    public Consumer setFile(String str) {
        this.file = str;
        return this;
    }
}
