/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.processor.grok;

import io.krakens.grok.api.Grok;
import io.krakens.grok.api.GrokCompiler;
import io.krakens.grok.api.Match;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.NotDirectoryException;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.annotations.SingleThread;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.processor.grok.GrokProcessorConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SingleThread
@DataPrepperPlugin(name="grok", pluginType=Processor.class, pluginConfigurationType=GrokProcessorConfig.class)
public class GrokProcessor
extends AbstractProcessor<Record<Event>, Record<Event>> {
    static final long EXECUTOR_SERVICE_SHUTDOWN_TIMEOUT = 300L;
    private static final Logger LOG = LoggerFactory.getLogger(GrokProcessor.class);
    private static final String DATA_PREPPER_GROK_PATTERNS_FILE = "grok-patterns/patterns";
    static final String GROK_PROCESSING_MATCH = "grokProcessingMatch";
    static final String GROK_PROCESSING_MISMATCH = "grokProcessingMismatch";
    static final String GROK_PROCESSING_ERRORS = "grokProcessingErrors";
    static final String GROK_PROCESSING_TIMEOUTS = "grokProcessingTimeouts";
    static final String GROK_PROCESSING_TIME = "grokProcessingTime";
    private final Counter grokProcessingMismatchCounter;
    private final Counter grokProcessingMatchCounter;
    private final Counter grokProcessingErrorsCounter;
    private final Counter grokProcessingTimeoutsCounter;
    private final Timer grokProcessingTime;
    private final GrokCompiler grokCompiler;
    private final Map<String, List<Grok>> fieldToGrok;
    private final GrokProcessorConfig grokProcessorConfig;
    private final Set<String> keysToOverwrite;
    private final ExecutorService executorService;
    private final List<String> tagsOnMatchFailure;
    private final List<String> tagsOnTimeout;
    private final ExpressionEvaluator expressionEvaluator;

    @DataPrepperPluginConstructor
    public GrokProcessor(PluginMetrics pluginMetrics, GrokProcessorConfig grokProcessorConfig, ExpressionEvaluator expressionEvaluator) {
        this(pluginMetrics, grokProcessorConfig, GrokCompiler.newInstance(), Executors.newSingleThreadExecutor(), expressionEvaluator);
    }

    GrokProcessor(PluginMetrics pluginMetrics, GrokProcessorConfig grokProcessorConfig, GrokCompiler grokCompiler, ExecutorService executorService, ExpressionEvaluator expressionEvaluator) {
        super(pluginMetrics);
        this.grokProcessorConfig = grokProcessorConfig;
        this.keysToOverwrite = new HashSet<String>(grokProcessorConfig.getkeysToOverwrite());
        this.grokCompiler = grokCompiler;
        this.fieldToGrok = new LinkedHashMap<String, List<Grok>>();
        this.executorService = executorService;
        this.expressionEvaluator = expressionEvaluator;
        this.tagsOnMatchFailure = grokProcessorConfig.getTagsOnMatchFailure();
        this.tagsOnTimeout = grokProcessorConfig.getTagsOnTimeout().isEmpty() ? grokProcessorConfig.getTagsOnMatchFailure() : grokProcessorConfig.getTagsOnTimeout();
        this.grokProcessingMatchCounter = pluginMetrics.counter(GROK_PROCESSING_MATCH);
        this.grokProcessingMismatchCounter = pluginMetrics.counter(GROK_PROCESSING_MISMATCH);
        this.grokProcessingErrorsCounter = pluginMetrics.counter(GROK_PROCESSING_ERRORS);
        this.grokProcessingTimeoutsCounter = pluginMetrics.counter(GROK_PROCESSING_TIMEOUTS);
        this.grokProcessingTime = pluginMetrics.timer(GROK_PROCESSING_TIME);
        this.registerPatterns();
        this.compileMatchPatterns();
        if (grokProcessorConfig.getGrokWhen() != null && !expressionEvaluator.isValidExpressionStatement(grokProcessorConfig.getGrokWhen()).booleanValue()) {
            throw new InvalidPluginConfigurationException(String.format("grok_when \"%s\" is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", grokProcessorConfig.getGrokWhen()));
        }
    }

    public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
        for (Record<Event> record : records) {
            Instant startTime = Instant.now();
            Event event = (Event)record.getData();
            try {
                if (Objects.nonNull(this.grokProcessorConfig.getGrokWhen()) && !this.expressionEvaluator.evaluateConditional(this.grokProcessorConfig.getGrokWhen(), event).booleanValue()) continue;
                if (this.grokProcessorConfig.getTimeoutMillis() == 0) {
                    this.grokProcessingTime.record(() -> this.matchAndMerge(event));
                } else {
                    this.runWithTimeout(() -> this.grokProcessingTime.record(() -> this.matchAndMerge(event)));
                }
            }
            catch (TimeoutException e) {
                event.getMetadata().addTags(this.tagsOnTimeout);
                LOG.atError().addMarker(DataPrepperMarkers.EVENT).addMarker(DataPrepperMarkers.NOISY).setMessage("Matching on record [{}] took longer than [{}] and timed out").addArgument(record.getData()).addArgument((Object)this.grokProcessorConfig.getTimeoutMillis()).log();
                this.grokProcessingTimeoutsCounter.increment();
            }
            catch (InterruptedException | RuntimeException | ExecutionException e) {
                event.getMetadata().addTags(this.tagsOnMatchFailure);
                LOG.atError().addMarker(DataPrepperMarkers.EVENT).addMarker(DataPrepperMarkers.NOISY).setMessage("An exception occurred when matching record [{}]").addArgument(record.getData()).setCause((Throwable)e).log();
                this.grokProcessingErrorsCounter.increment();
            }
            Instant endTime = Instant.now();
            if (!this.grokProcessorConfig.getIncludePerformanceMetadata()) continue;
            Long totalEventTimeInGrok = (Long)event.getMetadata().getAttribute("_total_grok_processing_time");
            if (totalEventTimeInGrok == null) {
                totalEventTimeInGrok = 0L;
            }
            long timeSpentInThisGrok = endTime.toEpochMilli() - startTime.toEpochMilli();
            event.getMetadata().setAttribute("_total_grok_processing_time", (Object)(totalEventTimeInGrok + timeSpentInThisGrok));
        }
        return records;
    }

    public void prepareForShutdown() {
    }

    public boolean isReadyForShutdown() {
        return true;
    }

    public void shutdown() {
        this.executorService.shutdown();
        try {
            if (this.executorService.awaitTermination(300L, TimeUnit.MILLISECONDS)) {
                LOG.info("Successfully waited for running task to terminate");
            } else {
                LOG.warn("Running task did not terminate in time, forcing termination");
                this.executorService.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted while waiting for running task to terminate", (Throwable)e);
            this.executorService.shutdownNow();
        }
    }

    private void registerPatterns() {
        this.grokCompiler.registerDefaultPatterns();
        this.registerBuiltInDataPrepperGrokPatterns();
        this.grokCompiler.register(this.grokProcessorConfig.getPatternDefinitions());
        this.registerPatternsDirectories();
    }

    private void registerBuiltInDataPrepperGrokPatterns() {
        try (InputStream directoryStream = ((Object)((Object)this)).getClass().getClassLoader().getResourceAsStream(DATA_PREPPER_GROK_PATTERNS_FILE);){
            this.grokCompiler.register(directoryStream);
        }
        catch (Exception e) {
            LOG.error("An exception occurred while initializing built in grok patterns for Data Prepper", (Throwable)e);
        }
    }

    private void registerPatternsDirectories() {
        for (String directory : this.grokProcessorConfig.getPatternsDirectories()) {
            Path path = FileSystems.getDefault().getPath(directory, new String[0]);
            try {
                DirectoryStream<Path> stream = Files.newDirectoryStream(path, this.grokProcessorConfig.getPatternsFilesGlob());
                try {
                    for (Path patternFile : stream) {
                        this.registerPatternsForFile(patternFile.toFile());
                    }
                }
                finally {
                    if (stream == null) continue;
                    stream.close();
                }
            }
            catch (PatternSyntaxException e) {
                LOG.error("Glob pattern {} is invalid", (Object)this.grokProcessorConfig.getPatternsFilesGlob());
            }
            catch (NotDirectoryException e) {
                LOG.error("{} is not a directory", (Object)directory, (Object)e);
            }
            catch (IOException e) {
                LOG.error("Error getting directory {}", (Object)directory, (Object)e);
            }
        }
    }

    private void registerPatternsForFile(File file) {
        try (FileInputStream in = new FileInputStream(file);){
            this.grokCompiler.register((InputStream)in);
        }
        catch (FileNotFoundException e) {
            LOG.error("Pattern file {} not found", (Object)file, (Object)e);
        }
        catch (IOException e) {
            LOG.error("Error reading from pattern file {}", (Object)file, (Object)e);
        }
    }

    private void compileMatchPatterns() {
        for (Map.Entry<String, List<String>> entry : this.grokProcessorConfig.getMatch().entrySet()) {
            this.fieldToGrok.put(entry.getKey(), entry.getValue().stream().map(item -> {
                try {
                    return this.grokCompiler.compile(item, this.grokProcessorConfig.isNamedCapturesOnly());
                }
                catch (IllegalArgumentException e) {
                    throw new InvalidPluginConfigurationException(String.format("Invalid regular expression pattern in match: %s", entry.getKey()), (Throwable)e);
                }
            }).collect(Collectors.toList()));
        }
    }

    private void matchAndMerge(Event event) {
        HashMap<String, Object> grokkedCaptures = new HashMap<String, Object>();
        int patternsAttempted = 0;
        for (Map.Entry<String, List<Grok>> entry : this.fieldToGrok.entrySet()) {
            for (Grok grok : entry.getValue()) {
                String value = (String)event.get(entry.getKey(), String.class);
                if (value == null || value.isEmpty()) continue;
                Match match = grok.match((CharSequence)value);
                match.setKeepEmptyCaptures(this.grokProcessorConfig.isKeepEmptyCaptures());
                Map captures = match.capture();
                this.mergeCaptures(grokkedCaptures, (Map<String, Object>)captures);
                ++patternsAttempted;
                if (!this.shouldBreakOnMatch(grokkedCaptures)) continue;
                break;
            }
            if (!this.shouldBreakOnMatch(grokkedCaptures)) continue;
            break;
        }
        if (this.grokProcessorConfig.getTargetKey() != null) {
            event.put(this.grokProcessorConfig.getTargetKey(), grokkedCaptures);
        } else {
            this.mergeCaptures(event, grokkedCaptures);
        }
        if (grokkedCaptures.isEmpty()) {
            if (this.tagsOnMatchFailure != null && this.tagsOnMatchFailure.size() > 0) {
                event.getMetadata().addTags(this.tagsOnMatchFailure);
            }
            this.grokProcessingMismatchCounter.increment();
        } else {
            this.grokProcessingMatchCounter.increment();
        }
        if (this.grokProcessorConfig.getIncludePerformanceMetadata()) {
            Integer totalPatternsAttemptedForEvent = (Integer)event.getMetadata().getAttribute("_total_grok_patterns_attempted");
            if (totalPatternsAttemptedForEvent == null) {
                totalPatternsAttemptedForEvent = 0;
            }
            event.getMetadata().setAttribute("_total_grok_patterns_attempted", (Object)(totalPatternsAttemptedForEvent + patternsAttempted));
        }
    }

    private void mergeCaptures(Map<String, Object> original, Map<String, Object> updates) {
        for (Map.Entry<String, Object> updateEntry : updates.entrySet()) {
            if (!original.containsKey(updateEntry.getKey()) || this.keysToOverwrite.contains(updateEntry.getKey())) {
                original.put(updateEntry.getKey(), updateEntry.getValue());
                continue;
            }
            if (original.get(updateEntry.getKey()) instanceof List) {
                this.mergeValueWithValues(updateEntry.getValue(), (List)original.get(updateEntry.getKey()));
                continue;
            }
            ArrayList<Object> values = new ArrayList<Object>(Collections.singletonList(original.get(updateEntry.getKey())));
            this.mergeValueWithValues(updateEntry.getValue(), values);
            original.put(updateEntry.getKey(), values);
        }
    }

    private void mergeCaptures(Event event, Map<String, Object> updates) {
        for (Map.Entry<String, Object> updateEntry : updates.entrySet()) {
            if (!event.containsKey(updateEntry.getKey()) || this.keysToOverwrite.contains(updateEntry.getKey())) {
                event.put(updateEntry.getKey(), updateEntry.getValue());
                continue;
            }
            if (event.isValueAList(updateEntry.getKey())) {
                List values = event.getList(updateEntry.getKey(), Object.class);
                this.mergeValueWithValues(updateEntry.getValue(), values);
                event.put(updateEntry.getKey(), (Object)values);
                continue;
            }
            Object fieldObject = event.get(updateEntry.getKey(), Object.class);
            ArrayList<Object> values = new ArrayList<Object>(Collections.singletonList(fieldObject));
            this.mergeValueWithValues(updateEntry.getValue(), values);
            event.put(updateEntry.getKey(), values);
        }
    }

    private void mergeValueWithValues(Object value, List<Object> values) {
        if (value instanceof List) {
            values.addAll((List)value);
        } else {
            values.add(value);
        }
    }

    private boolean shouldBreakOnMatch(Map<String, Object> captures) {
        return captures.size() > 0 && this.grokProcessorConfig.isBreakOnMatch();
    }

    private void runWithTimeout(Runnable runnable) throws TimeoutException, ExecutionException, InterruptedException {
        Future<?> task = this.executorService.submit(runnable);
        try {
            task.get(this.grokProcessorConfig.getTimeoutMillis(), TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException exception) {
            task.cancel(true);
            throw exception;
        }
    }
}

