/*
 * Decompiled with CFR 0.152.
 */
package com.chutneytesting.task.amqp;

import com.chutneytesting.task.amqp.ConnectionFactoryFactory;
import com.chutneytesting.task.amqp.consumer.ConsumerSupervisor;
import com.chutneytesting.task.amqp.consumer.QueueingConsumer;
import com.chutneytesting.task.spi.Task;
import com.chutneytesting.task.spi.TaskExecutionResult;
import com.chutneytesting.task.spi.injectable.Input;
import com.chutneytesting.task.spi.injectable.Logger;
import com.chutneytesting.task.spi.injectable.Target;
import com.chutneytesting.task.spi.time.Duration;
import com.chutneytesting.task.spi.validation.TaskValidatorsUtils;
import com.chutneytesting.task.spi.validation.Validator;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

public class AmqpBasicConsumeTask
implements Task {
    private final ConnectionFactoryFactory connectionFactoryFactory = new ConnectionFactoryFactory();
    private final Target target;
    private final String queueName;
    private final Integer nbMessages;
    private final String selector;
    private final String timeout;
    private final Boolean ack;
    private final Logger logger;

    public AmqpBasicConsumeTask(Target target, @Input(value="queue-name") String queueName, @Input(value="nb-messages") Integer nbMessages, @Input(value="selector") String selector, @Input(value="timeout") String timeout, @Input(value="ack") Boolean ack, Logger logger) {
        this.target = target;
        this.queueName = queueName;
        this.logger = logger;
        this.nbMessages = (Integer)ObjectUtils.defaultIfNull((Object)nbMessages, (Object)1);
        this.timeout = (String)StringUtils.defaultIfEmpty((CharSequence)timeout, (CharSequence)"10 sec");
        this.selector = selector;
        this.ack = (Boolean)ObjectUtils.defaultIfNull((Object)ack, (Object)true);
    }

    public List<String> validateInputs() {
        return Validator.getErrorsFrom((Validator[])new Validator[]{TaskValidatorsUtils.notBlankStringValidation((String)this.queueName, (String)"queue-name"), TaskValidatorsUtils.targetValidation((Target)this.target), TaskValidatorsUtils.durationValidation((String)this.timeout, (String)"timeout")});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public TaskExecutionResult execute() {
        long originalDuration = Duration.parse((String)this.timeout).toMilliseconds();
        ConsumerSupervisor instance = ConsumerSupervisor.getInstance();
        Connection connection = null;
        Channel channel = null;
        try {
            Pair<Boolean, Long> waitingResult = instance.waitUntilQueueAvailable(this.queueName, originalDuration, this.logger);
            boolean lockAcquired = (Boolean)waitingResult.getLeft();
            if (!lockAcquired) {
                TaskExecutionResult taskExecutionResult = TaskExecutionResult.ko();
                try {
                    this.closeChannel(channel);
                    this.closeConnection(connection);
                    return taskExecutionResult;
                }
                finally {
                    instance.unlock(this.queueName);
                }
            }
            connection = this.connectionFactoryFactory.newConnection(this.target);
            channel = connection.createChannel();
            long consumingDuration = (Long)waitingResult.getRight();
            QueueingConsumer.Result result = new QueueingConsumer(channel, this.queueName, this.nbMessages, this.selector, consumingDuration, this.ack).consume();
            if (result.messages.size() != this.nbMessages.intValue()) {
                this.logger.error("Unable to get the expected number of messages [" + this.nbMessages + "] during " + this.timeout + ".");
                TaskExecutionResult taskExecutionResult = TaskExecutionResult.ko();
                try {
                    this.closeChannel(channel);
                    this.closeConnection(connection);
                    return taskExecutionResult;
                }
                finally {
                    instance.unlock(this.queueName);
                }
            }
            this.logger.info("Message(s) found in " + result.consumeDuration);
            TaskExecutionResult taskExecutionResult = TaskExecutionResult.ok(this.extractOutputs(result));
            try {
                this.closeChannel(channel);
                this.closeConnection(connection);
                return taskExecutionResult;
            }
            finally {
                instance.unlock(this.queueName);
            }
        }
        catch (IOException | InterruptedException | TimeoutException e) {
            this.logger.error("Unable to establish connection to RabbitMQ: " + e.getMessage());
            TaskExecutionResult taskExecutionResult = TaskExecutionResult.ko();
            return taskExecutionResult;
        }
        finally {
            try {
                this.closeChannel(channel);
                this.closeConnection(connection);
            }
            finally {
                instance.unlock(this.queueName);
            }
        }
    }

    private Map<String, Object> extractOutputs(QueueingConsumer.Result result) {
        HashMap<String, Object> results = new HashMap<String, Object>();
        results.put("body", result.messages);
        results.put("payloads", result.payloads);
        results.put("headers", result.headers);
        return results;
    }

    private void closeConnection(Connection connection) {
        if (connection != null && connection.isOpen()) {
            try {
                connection.close(1000);
            }
            catch (IOException e) {
                this.logger.error("Error during connection closing: " + e.getMessage());
            }
        }
    }

    private void closeChannel(Channel channel) {
        if (channel != null && channel.isOpen()) {
            try {
                channel.close();
            }
            catch (IOException | TimeoutException e) {
                this.logger.error("Error during channel closing: " + e.getMessage());
            }
        }
    }
}

