/*
 * Decompiled with CFR 0.152.
 */
package com.chutneytesting.action.kafka;

import com.chutneytesting.action.kafka.ChutneyKafkaProducerFactory;
import com.chutneytesting.action.spi.Action;
import com.chutneytesting.action.spi.ActionExecutionResult;
import com.chutneytesting.action.spi.injectable.Input;
import com.chutneytesting.action.spi.injectable.Logger;
import com.chutneytesting.action.spi.injectable.Target;
import com.chutneytesting.action.spi.validation.ActionValidatorsUtils;
import com.chutneytesting.action.spi.validation.Validator;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.exec.util.MapUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.core.KafkaTemplate;

public class KafkaBasicPublishAction
implements Action {
    private final ChutneyKafkaProducerFactory producerFactory = new ChutneyKafkaProducerFactory();
    private final Target target;
    private final String topic;
    private final Map<String, String> headers;
    private final String payload;
    private final Map<String, String> properties;
    private final Logger logger;

    public KafkaBasicPublishAction(Target target, @Input(value="topic") String topic, @Input(value="headers") Map<String, String> headers, @Input(value="payload") String payload, @Input(value="properties") Map<String, String> properties, Logger logger) {
        this.target = target;
        this.topic = topic;
        this.headers = headers != null ? headers : Collections.emptyMap();
        this.payload = payload;
        this.properties = Optional.ofNullable(MapUtils.merge(this.extractProducerConfig(target), properties)).orElse(new HashMap());
        this.logger = logger;
    }

    public List<String> validateInputs() {
        return Validator.getErrorsFrom((Validator[])new Validator[]{ActionValidatorsUtils.notBlankStringValidation((String)this.topic, (String)"topic"), ActionValidatorsUtils.notBlankStringValidation((String)this.payload, (String)"payload"), ActionValidatorsUtils.targetValidation((Target)this.target)});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ActionExecutionResult execute() {
        try {
            List recordHeaders = this.headers.entrySet().stream().map(it -> new RecordHeader((String)it.getKey(), ((String)it.getValue()).getBytes())).collect(Collectors.toList());
            this.logger.info("sending message to topic=" + this.topic);
            ProducerRecord producerRecord = new ProducerRecord(this.topic, null, null, (Object)this.payload, recordHeaders);
            KafkaTemplate<String, String> kafkaTemplate = this.producerFactory.create(this.target, this.properties);
            kafkaTemplate.send(producerRecord).get(5L, TimeUnit.SECONDS);
            this.logger.info("Published Kafka Message on topic " + this.topic);
            ActionExecutionResult actionExecutionResult = ActionExecutionResult.ok(this.toOutputs(this.headers, this.payload));
            return actionExecutionResult;
        }
        catch (Exception e) {
            this.logger.error("An exception occurs when sending a message to Kafka server: " + e.getMessage());
            ActionExecutionResult actionExecutionResult = ActionExecutionResult.ko();
            return actionExecutionResult;
        }
        finally {
            try {
                this.producerFactory.destroy();
            }
            catch (Exception e) {
                this.logger.error((Throwable)e);
            }
        }
    }

    private Map<String, Object> toOutputs(Map<String, String> headers, String payload) {
        HashMap<String, Object> results = new HashMap<String, Object>();
        results.put("payload", payload);
        results.put("headers", headers.entrySet().stream().map(Object::toString).collect(Collectors.joining(";", "[", "]")));
        return results;
    }

    private Map<String, String> extractProducerConfig(Target target) {
        if (target != null) {
            HashMap<String, String> config = new HashMap<String, String>();
            ProducerConfig.configDef().configKeys().keySet().forEach(ck -> target.property(ck).ifPresent(cv -> config.put((String)ck, (String)cv)));
            return config;
        }
        return Collections.emptyMap();
    }
}

