/*
 * Decompiled with CFR 0.152.
 */
package com.chutneytesting.task.kafka;

import com.chutneytesting.task.amqp.utils.JsonPathEvaluator;
import com.chutneytesting.task.kafka.KafkaConsumerFactoryFactory;
import com.chutneytesting.task.spi.Task;
import com.chutneytesting.task.spi.TaskExecutionResult;
import com.chutneytesting.task.spi.injectable.Input;
import com.chutneytesting.task.spi.injectable.Logger;
import com.chutneytesting.task.spi.injectable.Target;
import com.chutneytesting.task.spi.time.Duration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;

public class KafkaBasicConsumeTask
implements Task {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private final String topic;
    private final Logger logger;
    private final Integer nbMessages;
    private final String timeout;
    private final String selector;
    private final ConsumerFactory<String, String> consumerFactory;
    private final CountDownLatch countDownLatch;
    private final List<Map<String, Object>> consumedMessages = new ArrayList<Map<String, Object>>();
    private final ConcurrentMessageListenerContainer<String, String> messageListenerContainer;

    public KafkaBasicConsumeTask(Target target, @Input(value="topic") String topic, @Input(value="group") String group, @Input(value="properties") Map<String, String> properties, @Input(value="nb-messages") Integer nbMessages, @Input(value="selector") String selector, @Input(value="timeout") String timeout, Logger logger) {
        this.topic = topic;
        this.nbMessages = (Integer)ObjectUtils.defaultIfNull((Object)nbMessages, (Object)1);
        this.selector = selector;
        this.timeout = (String)StringUtils.defaultIfEmpty((CharSequence)timeout, (CharSequence)"60 sec");
        this.consumerFactory = new KafkaConsumerFactoryFactory().create(target, group, (Map)ObjectUtils.defaultIfNull(properties, Collections.emptyMap()));
        this.countDownLatch = new CountDownLatch(this.nbMessages);
        this.messageListenerContainer = this.createMessageListenerContainer(this.createMessageListener());
        this.logger = logger;
    }

    public TaskExecutionResult execute() {
        try {
            this.logger.info("Consuming message from topic " + this.topic);
            this.messageListenerContainer.start();
            this.countDownLatch.await(Duration.parse((String)this.timeout).toMilliseconds(), TimeUnit.MILLISECONDS);
            if (this.consumedMessages.size() != this.nbMessages.intValue()) {
                this.logger.error("Unable to get the expected number of messages [" + this.nbMessages + "] during " + this.timeout + "from topic " + this.topic + ".");
                TaskExecutionResult taskExecutionResult = TaskExecutionResult.ko();
                return taskExecutionResult;
            }
            this.logger.info("Consumed [" + this.nbMessages + "] Kafka Messages from topic " + this.topic);
            HashMap<String, List<Map<String, Object>>> results = new HashMap<String, List<Map<String, Object>>>();
            results.put("body", this.consumedMessages);
            results.put("payloads", this.consumedMessages.stream().map(e -> e.get("payload")).collect(Collectors.toList()));
            results.put("headers", this.consumedMessages.stream().map(e -> e.get("headers")).collect(Collectors.toList()));
            TaskExecutionResult taskExecutionResult = TaskExecutionResult.ok(results);
            return taskExecutionResult;
        }
        catch (Exception e2) {
            this.logger.error("An exception occurs when consuming a message to Kafka server: " + e2.getMessage());
            TaskExecutionResult taskExecutionResult = TaskExecutionResult.ko();
            return taskExecutionResult;
        }
        finally {
            this.messageListenerContainer.stop();
        }
    }

    private MessageListener<String, String> createMessageListener() {
        return record -> {
            if (this.countDownLatch.getCount() <= 0L) {
                return;
            }
            Map<String, Object> message = this.extractMessageFromRecord((ConsumerRecord<String, String>)record);
            if (StringUtils.isBlank((CharSequence)this.selector)) {
                this.addMessageToResultAndCountDown(message);
            } else {
                try {
                    String messageAsString = OBJECT_MAPPER.writeValueAsString(message);
                    if (JsonPathEvaluator.evaluate(messageAsString, this.selector)) {
                        this.addMessageToResultAndCountDown(message);
                    }
                }
                catch (JsonProcessingException e) {
                    this.logger.info("Received a message, however cannot read process it as json, Ignoring message selection.");
                }
            }
        };
    }

    private void addMessageToResultAndCountDown(Map<String, Object> message) {
        this.consumedMessages.add(message);
        this.countDownLatch.countDown();
    }

    private Object extractPayload(ConsumerRecord<String, String> record) {
        Object payload;
        try {
            payload = OBJECT_MAPPER.readValue((String)record.value(), Map.class);
        }
        catch (IOException e) {
            this.logger.info("Received a message, however cannot read it as Json fallback as String.");
            payload = record.value();
        }
        return payload;
    }

    private Map<String, Object> extractMessageFromRecord(ConsumerRecord<String, String> record) {
        HashMap<String, Object> message = new HashMap<String, Object>();
        Map<String, Object> headerz = this.extractHeaders(record);
        Object payload = this.extractPayload(record);
        message.put("headers", headerz);
        message.put("payload", payload);
        return message;
    }

    private Map<String, Object> extractHeaders(ConsumerRecord<String, String> record) {
        return Stream.of(record.headers().toArray()).collect(Collectors.toMap(Header::key, header -> new String(header.value(), StandardCharsets.UTF_8)));
    }

    private ConcurrentMessageListenerContainer<String, String> createMessageListenerContainer(MessageListener<String, String> messageListener) {
        ContainerProperties containerProperties = new ContainerProperties(new String[]{this.topic});
        containerProperties.setMessageListener(messageListener);
        return new ConcurrentMessageListenerContainer(this.consumerFactory, containerProperties);
    }
}

