/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.autodetect;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.ml.job.config.ModelPlotConfig;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutodetectResultsParser;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.StateProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.InterimResultsParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ControlMsgToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.ml.job.results.AutodetectResult;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;

class NativeAutodetectProcess
implements AutodetectProcess {
    private static final Logger LOGGER = Loggers.getLogger(NativeAutodetectProcess.class);
    private final String jobId;
    private final CppLogMessageHandler cppLogHandler;
    private final OutputStream processInStream;
    private final InputStream processOutStream;
    private final LengthEncodedWriter recordWriter;
    private final ZonedDateTime startTime;
    private final int numberOfAnalysisFields;
    private final List<Path> filesToDelete;
    private final Runnable onProcessCrash;
    private volatile Future<?> logTailFuture;
    private volatile Future<?> stateProcessorFuture;
    private volatile boolean processCloseInitiated;
    private final AutodetectResultsParser resultsParser;

    NativeAutodetectProcess(String jobId, InputStream logStream, OutputStream processInStream, InputStream processOutStream, int numberOfAnalysisFields, List<Path> filesToDelete, AutodetectResultsParser resultsParser, Runnable onProcessCrash) {
        this.jobId = jobId;
        this.cppLogHandler = new CppLogMessageHandler(jobId, logStream);
        this.processInStream = new BufferedOutputStream(processInStream);
        this.processOutStream = processOutStream;
        this.recordWriter = new LengthEncodedWriter(this.processInStream);
        this.startTime = ZonedDateTime.now();
        this.numberOfAnalysisFields = numberOfAnalysisFields;
        this.filesToDelete = filesToDelete;
        this.resultsParser = resultsParser;
        this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
    }

    public void start(ExecutorService executorService, StateProcessor stateProcessor, InputStream persistStream) {
        this.logTailFuture = executorService.submit(() -> {
            try (CppLogMessageHandler h = this.cppLogHandler;){
                h.tailStream();
            }
            catch (IOException e) {
                LOGGER.error((Message)new ParameterizedMessage("[{}] Error tailing autodetect process logs", (Object)this.jobId), (Throwable)e);
            }
            finally {
                if (!this.processCloseInitiated) {
                    LOGGER.error("[{}] autodetect process stopped unexpectedly", (Object)this.jobId);
                    this.onProcessCrash.run();
                }
            }
        });
        this.stateProcessorFuture = executorService.submit(() -> {
            try (InputStream in = persistStream;){
                stateProcessor.process(this.jobId, in);
            }
            catch (IOException e) {
                LOGGER.error((Message)new ParameterizedMessage("[{}] Error reading autodetect state output", (Object)this.jobId), (Throwable)e);
            }
        });
    }

    @Override
    public void writeRecord(String[] record) throws IOException {
        this.recordWriter.writeRecord(record);
    }

    @Override
    public void writeResetBucketsControlMessage(DataLoadParams params) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfAnalysisFields);
        writer.writeResetBucketsMessage(params);
    }

    @Override
    public void writeUpdateModelPlotMessage(ModelPlotConfig modelPlotConfig) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfAnalysisFields);
        writer.writeUpdateModelPlotMessage(modelPlotConfig);
    }

    @Override
    public void writeUpdateDetectorRulesMessage(int detectorIndex, List<DetectionRule> rules) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfAnalysisFields);
        writer.writeUpdateDetectorRulesMessage(detectorIndex, rules);
    }

    @Override
    public String flushJob(InterimResultsParams params) throws IOException {
        ControlMsgToProcessWriter writer = new ControlMsgToProcessWriter(this.recordWriter, this.numberOfAnalysisFields);
        writer.writeCalcInterimMessage(params);
        return writer.writeFlushMessage();
    }

    @Override
    public void flushStream() throws IOException {
        this.recordWriter.flush();
    }

    @Override
    public void close() throws IOException {
        try {
            this.processCloseInitiated = true;
            this.processInStream.close();
            this.stateProcessorFuture.get(MachineLearning.STATE_PERSIST_RESTORE_TIMEOUT.getMinutes(), TimeUnit.MINUTES);
            this.logTailFuture.get(5L, TimeUnit.SECONDS);
            if (this.cppLogHandler.seenFatalError()) {
                throw ExceptionsHelper.serverError(this.cppLogHandler.getErrors());
            }
            LOGGER.debug("[{}] Autodetect process exited", (Object)this.jobId);
        }
        catch (ExecutionException | TimeoutException e) {
            LOGGER.warn((Message)new ParameterizedMessage("[{}] Exception closing the running autodetect process", (Object)this.jobId), (Throwable)e);
        }
        catch (InterruptedException e) {
            LOGGER.warn((Message)new ParameterizedMessage("[{}] Exception closing the running autodetect process", (Object)this.jobId), (Throwable)e);
            Thread.currentThread().interrupt();
        }
        finally {
            this.deleteAssociatedFiles();
        }
    }

    void deleteAssociatedFiles() throws IOException {
        if (this.filesToDelete == null) {
            return;
        }
        for (Path fileToDelete : this.filesToDelete) {
            if (Files.deleteIfExists(fileToDelete)) {
                LOGGER.debug("[{}] Deleted file {}", (Object)this.jobId, (Object)fileToDelete.toString());
                continue;
            }
            LOGGER.warn("[{}] Failed to delete file {}", (Object)this.jobId, (Object)fileToDelete.toString());
        }
    }

    @Override
    public Iterator<AutodetectResult> readAutodetectResults() {
        return this.resultsParser.parseResults(this.processOutStream);
    }

    @Override
    public ZonedDateTime getProcessStartTime() {
        return this.startTime;
    }

    @Override
    public boolean isProcessAlive() {
        return !this.cppLogHandler.hasLogStreamEnded();
    }

    @Override
    public String readError() {
        return this.cppLogHandler.getErrors();
    }
}

