/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.job.process.normalizer;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.LengthEncodedWriter;
import org.elasticsearch.xpack.ml.job.process.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerProcess;
import org.elasticsearch.xpack.ml.job.process.normalizer.output.NormalizerResultHandler;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;

class NativeNormalizerProcess
implements NormalizerProcess {
    private static final Logger LOGGER = Loggers.getLogger(NativeNormalizerProcess.class);
    private final String jobId;
    private final Settings settings;
    private final CppLogMessageHandler cppLogHandler;
    private final OutputStream processInStream;
    private final InputStream processOutStream;
    private final LengthEncodedWriter recordWriter;
    private volatile boolean processCloseInitiated;
    private Future<?> logTailThread;

    NativeNormalizerProcess(String jobId, Settings settings, InputStream logStream, OutputStream processInStream, InputStream processOutStream, ExecutorService executorService) throws EsRejectedExecutionException {
        this.jobId = jobId;
        this.settings = settings;
        this.cppLogHandler = new CppLogMessageHandler(jobId, logStream);
        this.processInStream = new BufferedOutputStream(processInStream);
        this.processOutStream = processOutStream;
        this.recordWriter = new LengthEncodedWriter(this.processInStream);
        this.logTailThread = executorService.submit(() -> {
            try (CppLogMessageHandler h = this.cppLogHandler;){
                h.tailStream();
            }
            catch (IOException e) {
                LOGGER.error((Message)new ParameterizedMessage("[{}] Error tailing normalizer process logs", new Object[]{jobId}), (Throwable)e);
            }
            finally {
                if (!this.processCloseInitiated) {
                    LOGGER.error("[{}] normalizer process stopped unexpectedly", (Object)jobId);
                }
            }
        });
    }

    @Override
    public void writeRecord(String[] record) throws IOException {
        this.recordWriter.writeRecord(record);
    }

    @Override
    public void close() throws IOException {
        try {
            this.processCloseInitiated = true;
            this.processInStream.close();
            this.logTailThread.get(5L, TimeUnit.MINUTES);
            if (this.cppLogHandler.seenFatalError()) {
                throw ExceptionsHelper.serverError(this.cppLogHandler.getErrors());
            }
            LOGGER.debug("[{}] Normalizer process exited", (Object)this.jobId);
        }
        catch (ExecutionException | TimeoutException e) {
            LOGGER.warn((Message)new ParameterizedMessage("[{}] Exception closing the running normalizer process", new Object[]{this.jobId}), (Throwable)e);
        }
        catch (InterruptedException e) {
            LOGGER.warn("[{}] Exception closing the running normalizer process", (Object)this.jobId);
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public NormalizerResultHandler createNormalizedResultsHandler() {
        return new NormalizerResultHandler(this.settings, this.processOutStream);
    }

    @Override
    public boolean isProcessAlive() {
        return !this.cppLogHandler.hasLogStreamEnded();
    }

    @Override
    public String readError() {
        return this.cppLogHandler.getErrors();
    }
}

