/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.dataprepper.plugins.ml_inference.processor;

import io.micrometer.core.instrument.Counter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.expression.ExpressionParsingException;
import org.opensearch.dataprepper.logging.DataPrepperMarkers;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.annotations.Experimental;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.processor.AbstractProcessor;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.ml_inference.processor.MLProcessorConfig;
import org.opensearch.dataprepper.plugins.ml_inference.processor.common.MLBatchJobCreator;
import org.opensearch.dataprepper.plugins.ml_inference.processor.common.MLBatchJobCreatorFactory;
import org.opensearch.dataprepper.plugins.ml_inference.processor.configuration.ServiceName;
import org.opensearch.dataprepper.plugins.ml_inference.processor.exception.MLBatchJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
@DataPrepperPlugin(name="ml_inference", pluginType=Processor.class, pluginConfigurationType=MLProcessorConfig.class)
public class MLProcessor
extends AbstractProcessor<Record<Event>, Record<Event>> {
    public static final Logger LOG = LoggerFactory.getLogger(MLProcessor.class);
    public static final String NUMBER_OF_ML_PROCESSOR_SUCCESS = "BatchJobRequestsSucceeded";
    public static final String NUMBER_OF_ML_PROCESSOR_FAILED = "BatchJobRequestsFailed";
    private final String whenCondition;
    private final MLBatchJobCreator mlBatchJobCreator;
    private final Counter numberOfMLProcessorSuccessCounter;
    private final Counter numberOfMLProcessorFailedCounter;
    private final ExpressionEvaluator expressionEvaluator;

    @DataPrepperPluginConstructor
    public MLProcessor(MLProcessorConfig mlProcessorConfig, PluginMetrics pluginMetrics, AwsCredentialsSupplier awsCredentialsSupplier, ExpressionEvaluator expressionEvaluator) {
        super(pluginMetrics);
        this.whenCondition = mlProcessorConfig.getWhenCondition();
        ServiceName serviceName = mlProcessorConfig.getServiceName();
        this.numberOfMLProcessorSuccessCounter = pluginMetrics.counter(NUMBER_OF_ML_PROCESSOR_SUCCESS);
        this.numberOfMLProcessorFailedCounter = pluginMetrics.counter(NUMBER_OF_ML_PROCESSOR_FAILED);
        this.expressionEvaluator = expressionEvaluator;
        this.mlBatchJobCreator = MLBatchJobCreatorFactory.getJobCreator(serviceName, mlProcessorConfig, awsCredentialsSupplier, pluginMetrics);
    }

    public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
        ArrayList<Record<Event>> resultRecords = new ArrayList<Record<Event>>();
        this.mlBatchJobCreator.checkAndProcessBatch();
        this.mlBatchJobCreator.addProcessedBatchRecordsToResults(resultRecords);
        if (records.size() == 0) {
            return resultRecords;
        }
        List<Record<Event>> recordsToMlCommons = records.stream().filter(record -> {
            try {
                boolean meetCondition;
                boolean bl = meetCondition = this.whenCondition == null || this.expressionEvaluator.evaluateConditional(this.whenCondition, (Event)record.getData()) != false;
                if (!meetCondition) {
                    resultRecords.add((Record<Event>)record);
                }
                return meetCondition;
            }
            catch (ExpressionParsingException e) {
                LOG.warn("Expression parsing failed for record: {}. Error: {}", record, (Object)e.getMessage());
                resultRecords.add((Record<Event>)record);
                return false;
            }
            catch (ClassCastException e) {
                LOG.warn("Unexpected return type when evaluating condition for record: {}. Error: {}", record, (Object)e.getMessage());
                resultRecords.add((Record<Event>)record);
                return false;
            }
            catch (Exception e) {
                LOG.error("Failed to evaluate conditional expression for record: {}", record, (Object)e);
                resultRecords.add((Record<Event>)record);
                return false;
            }
        }).collect(Collectors.toList());
        if (recordsToMlCommons.isEmpty()) {
            return resultRecords;
        }
        try {
            this.mlBatchJobCreator.createMLBatchJob(recordsToMlCommons, resultRecords);
            this.numberOfMLProcessorSuccessCounter.increment();
        }
        catch (MLBatchJobException e) {
            LOG.error(DataPrepperMarkers.NOISY, "ML Batch job creation failed: {}", (Object)e.getMessage());
            this.numberOfMLProcessorFailedCounter.increment();
        }
        catch (Exception e) {
            LOG.error(DataPrepperMarkers.NOISY, "Unexpected Error occurred while creating the batch job: {}", (Object)e.getMessage(), (Object)e);
            this.numberOfMLProcessorFailedCounter.increment();
        }
        return resultRecords;
    }

    public void prepareForShutdown() {
        this.mlBatchJobCreator.prepareForShutdown();
    }

    public boolean isReadyForShutdown() {
        return this.mlBatchJobCreator.isReadyForShutdown();
    }

    public void shutdown() {
        this.mlBatchJobCreator.shutdown();
    }
}

