package uk.co.gresearch.siembol.enrichments.storm;

import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Optional;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.co.gresearch.siembol.common.constants.SiembolMessageFields;
import uk.co.gresearch.siembol.common.error.ErrorMessage;
import uk.co.gresearch.siembol.common.error.ErrorType;
import uk.co.gresearch.siembol.common.model.StormEnrichmentAttributesDto;
import uk.co.gresearch.siembol.common.storm.KafkaWriterMessage;
import uk.co.gresearch.siembol.common.storm.KafkaWriterMessages;
import uk.co.gresearch.siembol.enrichments.evaluation.EnrichmentEvaluatorLibrary;
import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentExceptions;
import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentPairs;
import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentTuples;

/* loaded from: input_file:uk/co/gresearch/siembol/enrichments/storm/EnrichmentMergerBolt.class */
public class EnrichmentMergerBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String INVALID_TYPE_IN_TUPLES = "Invalid type in tuple provided";
    private static final String MERGING_ERROR = "Unable to merge the event: {} with the enrichments : {}";
    private static final String EVENT_INFO_LOG = "The event after enrichments: {}";
    private OutputCollector collector;
    private final String outputTopic;
    private final String errorTopic;

    public EnrichmentMergerBolt(StormEnrichmentAttributesDto stormEnrichmentAttributesDto) {
        this.outputTopic = stormEnrichmentAttributesDto.getEnrichingOutputTopic();
        this.errorTopic = stormEnrichmentAttributesDto.getEnrichingErrorTopic();
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    public void execute(Tuple tuple) {
        Object valueByField = tuple.getValueByField(EnrichmentTuples.ENRICHMENTS.toString());
        if (!(valueByField instanceof EnrichmentPairs)) {
            LOG.error(INVALID_TYPE_IN_TUPLES);
            throw new IllegalArgumentException(INVALID_TYPE_IN_TUPLES);
        }
        EnrichmentPairs enrichmentPairs = (EnrichmentPairs) valueByField;
        String stringByField = tuple.getStringByField(EnrichmentTuples.EVENT.toString());
        Object valueByField2 = tuple.getValueByField(EnrichmentTuples.EXCEPTIONS.toString());
        if (!(valueByField2 instanceof EnrichmentExceptions)) {
            LOG.error(INVALID_TYPE_IN_TUPLES);
            throw new IllegalArgumentException(INVALID_TYPE_IN_TUPLES);
        }
        EnrichmentExceptions enrichmentExceptions = (EnrichmentExceptions) valueByField2;
        try {
            stringByField = EnrichmentEvaluatorLibrary.mergeEnrichments(stringByField, enrichmentPairs, Optional.of(SiembolMessageFields.ENRICHING_TIME.toString()));
        } catch (Exception e) {
            LOG.error(MERGING_ERROR, stringByField, enrichmentPairs.toString());
            enrichmentExceptions.add(ErrorMessage.createErrorMessage(e, ErrorType.ENRICHMENT_ERROR).toString());
        }
        LOG.debug(EVENT_INFO_LOG, stringByField);
        KafkaWriterMessages kafkaWriterMessages = new KafkaWriterMessages();
        kafkaWriterMessages.add(new KafkaWriterMessage(this.outputTopic, stringByField));
        enrichmentExceptions.forEach(str -> {
            kafkaWriterMessages.add(new KafkaWriterMessage(this.errorTopic, str));
        });
        this.collector.emit(tuple, new Values(new Object[]{kafkaWriterMessages}));
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{EnrichmentTuples.KAFKA_MESSAGES.toString()}));
    }
}
