/*
 * Decompiled with CFR 0.152.
 */
package com.chutneytesting.task.amqp;

import com.chutneytesting.task.amqp.ConnectionFactoryFactory;
import com.chutneytesting.task.amqp.consumer.QueueingConsumer;
import com.chutneytesting.task.spi.Task;
import com.chutneytesting.task.spi.TaskExecutionResult;
import com.chutneytesting.task.spi.injectable.Input;
import com.chutneytesting.task.spi.injectable.Logger;
import com.chutneytesting.task.spi.injectable.Target;
import com.chutneytesting.task.spi.time.Duration;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;

public class AmqpBasicConsumeTask
implements Task {
    private final ConnectionFactoryFactory connectionFactoryFactory = new ConnectionFactoryFactory();
    private final ConnectionFactory connectionFactory;
    private final String queueName;
    private final Integer nbMessages;
    private final String selector;
    private final String timeout;
    private final Boolean ack;
    private final Logger logger;

    public AmqpBasicConsumeTask(Target target, @Input(value="queue-name") String queueName, @Input(value="nb-messages") Integer nbMessages, @Input(value="selector") String selector, @Input(value="timeout") String timeout, @Input(value="ack") Boolean ack, Logger logger) {
        this.connectionFactory = this.connectionFactoryFactory.create(target);
        this.queueName = queueName;
        this.logger = logger;
        this.nbMessages = (Integer)ObjectUtils.defaultIfNull((Object)nbMessages, (Object)1);
        this.timeout = (String)StringUtils.defaultIfEmpty((CharSequence)timeout, (CharSequence)"60 sec");
        this.selector = selector;
        this.ack = (Boolean)ObjectUtils.defaultIfNull((Object)ack, (Object)true);
    }

    /*
     * Enabled aggressive exception aggregation
     */
    public TaskExecutionResult execute() {
        try (Connection connection = this.connectionFactory.newConnection();){
            TaskExecutionResult taskExecutionResult;
            block18: {
                QueueingConsumer.Result result;
                Channel channel;
                block16: {
                    TaskExecutionResult taskExecutionResult2;
                    block17: {
                        channel = connection.createChannel();
                        try {
                            long duration = Duration.parse((String)this.timeout).toMilliseconds();
                            result = new QueueingConsumer(channel, this.queueName, this.nbMessages, this.selector, duration, this.ack).consume();
                            if (result.messages.size() == this.nbMessages.intValue()) break block16;
                            this.logger.error("Unable to get the expected number of messages [" + this.nbMessages + "] during " + this.timeout + ".");
                            taskExecutionResult2 = TaskExecutionResult.ko();
                            if (channel == null) break block17;
                        }
                        catch (Throwable throwable) {
                            if (channel != null) {
                                try {
                                    channel.close();
                                }
                                catch (Throwable throwable2) {
                                    throwable.addSuppressed(throwable2);
                                }
                            }
                            throw throwable;
                        }
                        channel.close();
                    }
                    return taskExecutionResult2;
                }
                HashMap<String, List<Object>> results = new HashMap<String, List<Object>>();
                results.put("body", result.messages);
                results.put("payloads", result.payloads);
                results.put("headers", result.headers);
                taskExecutionResult = TaskExecutionResult.ok(results);
                if (channel == null) break block18;
                channel.close();
            }
            return taskExecutionResult;
        }
        catch (IOException | InterruptedException | TimeoutException e) {
            this.logger.error("Unable to establish connection to RabbitMQ: " + e.getMessage());
            return TaskExecutionResult.ko();
        }
    }
}

