package uk.co.gresearch.siembol.enrichments.storm;

import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.co.gresearch.siembol.common.error.ErrorMessage;
import uk.co.gresearch.siembol.common.error.ErrorType;
import uk.co.gresearch.siembol.common.model.StormEnrichmentAttributesDto;
import uk.co.gresearch.siembol.common.model.ZooKeeperAttributesDto;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory;
import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactoryImpl;
import uk.co.gresearch.siembol.enrichments.common.EnrichmentResult;
import uk.co.gresearch.siembol.enrichments.compiler.EnrichmentCompilerImpl;
import uk.co.gresearch.siembol.enrichments.evaluation.EnrichmentEvaluator;
import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentCommands;
import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentExceptions;
import uk.co.gresearch.siembol.enrichments.storm.common.EnrichmentTuples;

/* loaded from: input_file:uk/co/gresearch/siembol/enrichments/storm/EnrichmentEvaluatorBolt.class */
public class EnrichmentEvaluatorBolt extends BaseRichBolt {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final String INIT_EXCEPTION_MSG_FORMAT = "Enriching rule engine exception: %s during initialising alerts engine";
    private static final String UPDATE_EXCEPTION_LOG = "Exception during enriching rule engine update: {}";
    private static final String ENGINE_INIT_MESSAGE = "Enriching rule engine exception: Engine initialisation error";
    private static final String ENGINE_INIT_START = "Enriching rule engine initialisation start";
    private static final String ENGINE_INIT_COMPLETED = "Enriching rule engine initialisation completed";
    private static final String ENGINE_UPDATE_START = "Enriching rule engine update start";
    private static final String ENGINE_UPDATE_COMPLETED = "Enriching rule engine update completed";
    private static final String ENGINE_UPDATE_TRY_MSG_FORMAT = "Enriching rule engine is trying to update the rules: {}";
    private static final String EXCEPTION_RULE_EVALUATION = "Exception during enriching rule evaluation: {}";
    protected static final String COMPILER_EXCEPTION_MSG_FORMAT = "Exception during enriching rules compilation: %s";
    protected final AtomicReference<EnrichmentEvaluator> enrichmentEvaluator;
    private OutputCollector collector;
    private ZooKeeperConnector zooKeeperConnector;
    private final ZooKeeperAttributesDto zooKeeperAttributes;
    private final ZooKeeperConnectorFactory zooKeeperConnectorFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EnrichmentEvaluatorBolt(StormEnrichmentAttributesDto stormEnrichmentAttributesDto, ZooKeeperConnectorFactory zooKeeperConnectorFactory) {
        this.enrichmentEvaluator = new AtomicReference<>();
        this.zooKeeperAttributes = stormEnrichmentAttributesDto.getEnrichingRulesZookeperAttributes();
        this.zooKeeperConnectorFactory = zooKeeperConnectorFactory;
    }

    public EnrichmentEvaluatorBolt(StormEnrichmentAttributesDto stormEnrichmentAttributesDto) {
        this(stormEnrichmentAttributesDto, new ZooKeeperConnectorFactoryImpl());
    }

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        try {
            LOG.info(ENGINE_INIT_START);
            this.zooKeeperConnector = this.zooKeeperConnectorFactory.createZookeeperConnector(this.zooKeeperAttributes);
            updateRules();
            if (this.enrichmentEvaluator.get() == null) {
                throw new IllegalStateException(ENGINE_INIT_MESSAGE);
            }
            this.zooKeeperConnector.addCacheListener(this::updateRules);
            LOG.info(ENGINE_INIT_COMPLETED);
        } catch (Exception e) {
            String format = String.format(INIT_EXCEPTION_MSG_FORMAT, ExceptionUtils.getStackTrace(e));
            LOG.error(format);
            throw new IllegalStateException(format);
        }
    }

    private void updateRules() {
        try {
            LOG.info(ENGINE_UPDATE_START);
            String data = this.zooKeeperConnector.getData();
            LOG.info(ENGINE_UPDATE_TRY_MSG_FORMAT, StringUtils.left(data, 100));
            this.enrichmentEvaluator.set(getEnrichmentEvaluator(data));
            LOG.info(ENGINE_UPDATE_COMPLETED);
        } catch (Exception e) {
            LOG.error(UPDATE_EXCEPTION_LOG, ExceptionUtils.getStackTrace(e));
        }
    }

    private EnrichmentEvaluator getEnrichmentEvaluator(String str) {
        try {
            EnrichmentResult compile = EnrichmentCompilerImpl.createEnrichmentsCompiler().compile(str);
            if (compile.getStatusCode() == EnrichmentResult.StatusCode.OK) {
                return compile.getAttributes().getRuleEvaluator();
            }
            String format = String.format(COMPILER_EXCEPTION_MSG_FORMAT, compile.getAttributes().getMessage());
            LOG.error(format);
            throw new IllegalStateException(format);
        } catch (Exception e) {
            String format2 = String.format(COMPILER_EXCEPTION_MSG_FORMAT, ExceptionUtils.getStackTrace(e));
            LOG.error(format2);
            throw new IllegalStateException(format2);
        }
    }

    public void execute(Tuple tuple) {
        EnrichmentEvaluator enrichmentEvaluator = this.enrichmentEvaluator.get();
        String stringByField = tuple.getStringByField(EnrichmentTuples.EVENT.toString());
        EnrichmentCommands enrichmentCommands = new EnrichmentCommands();
        EnrichmentExceptions enrichmentExceptions = new EnrichmentExceptions();
        try {
            EnrichmentResult evaluate = enrichmentEvaluator.evaluate(stringByField);
            if (evaluate.getStatusCode() == EnrichmentResult.StatusCode.OK) {
                if (evaluate.getAttributes().getEnrichmentCommands() != null) {
                    enrichmentCommands.addAll(evaluate.getAttributes().getEnrichmentCommands());
                }
            } else if (evaluate.getAttributes().getMessage() != null) {
                enrichmentExceptions.add(evaluate.getAttributes().getMessage());
            }
        } catch (Exception e) {
            LOG.error(EXCEPTION_RULE_EVALUATION, ExceptionUtils.getStackTrace(e));
            enrichmentExceptions.add(ErrorMessage.createErrorMessage(e, ErrorType.ENRICHMENT_ERROR).toString());
        }
        this.collector.emit(tuple, new Values(new Object[]{stringByField, enrichmentCommands, enrichmentExceptions}));
        this.collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields(new String[]{EnrichmentTuples.EVENT.toString(), EnrichmentTuples.COMMANDS.toString(), EnrichmentTuples.EXCEPTIONS.toString()}));
    }
}
