package org.elasticsearch.xpack.ml.datafeed;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.ClientHelper;
import org.elasticsearch.xpack.ml.action.FlushJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.ml.notifications.Auditor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:lib/org.elasticsearch.plugin.xpack.api-6.1.3.jar:org/elasticsearch/xpack/ml/datafeed/DatafeedJob.class */
public class DatafeedJob {
    private static final Logger LOGGER = Loggers.getLogger((Class<?>) DatafeedJob.class);
    private static final int NEXT_TASK_DELAY_MS = 100;
    private final Auditor auditor;
    private final String jobId;
    private final DataDescription dataDescription;
    private final long frequencyMs;
    private final long queryDelayMs;
    private final Client client;
    private final DataExtractorFactory dataExtractorFactory;
    private final Supplier<Long> currentTimeSupplier;
    private volatile long lookbackStartTimeMs;
    private volatile Long lastEndTimeMs;
    private AtomicBoolean running = new AtomicBoolean(true);
    private volatile boolean isIsolated;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/org.elasticsearch.plugin.xpack.api-6.1.3.jar:org/elasticsearch/xpack/ml/datafeed/DatafeedJob$AnalysisProblemException.class */
    public static class AnalysisProblemException extends RuntimeException {
        final boolean shouldStop;
        final long nextDelayInMsSinceEpoch;

        AnalysisProblemException(long j, boolean z, Throwable th) {
            super(th);
            this.shouldStop = z;
            this.nextDelayInMsSinceEpoch = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/org.elasticsearch.plugin.xpack.api-6.1.3.jar:org/elasticsearch/xpack/ml/datafeed/DatafeedJob$EmptyDataCountException.class */
    public static class EmptyDataCountException extends RuntimeException {
        final long nextDelayInMsSinceEpoch;

        EmptyDataCountException(long j) {
            this.nextDelayInMsSinceEpoch = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/org.elasticsearch.plugin.xpack.api-6.1.3.jar:org/elasticsearch/xpack/ml/datafeed/DatafeedJob$ExtractionProblemException.class */
    public static class ExtractionProblemException extends RuntimeException {
        final long nextDelayInMsSinceEpoch;

        ExtractionProblemException(long j, Throwable th) {
            super(th);
            this.nextDelayInMsSinceEpoch = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DatafeedJob(String str, DataDescription dataDescription, long j, long j2, DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier<Long> supplier, long j3, long j4) {
        this.jobId = str;
        this.dataDescription = (DataDescription) Objects.requireNonNull(dataDescription);
        this.frequencyMs = j;
        this.queryDelayMs = j2;
        this.dataExtractorFactory = dataExtractorFactory;
        this.client = client;
        this.auditor = auditor;
        this.currentTimeSupplier = supplier;
        long max = Math.max(j3, j4);
        if (max > 0) {
            this.lastEndTimeMs = Long.valueOf(max);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void isolate() {
        this.isIsolated = true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isIsolated() {
        return this.isIsolated;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Long runLookBack(long j, Long l) throws Exception {
        this.lookbackStartTimeMs = skipToStartTime(j);
        Optional ofNullable = Optional.ofNullable(l);
        long longValue = ((Long) ofNullable.orElse(Long.valueOf(this.currentTimeSupplier.get().longValue() - this.queryDelayMs))).longValue();
        boolean isPresent = ofNullable.isPresent();
        if (longValue <= this.lookbackStartTimeMs) {
            if (isPresent) {
                return null;
            }
            this.auditor.info(this.jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STARTED_REALTIME));
            return Long.valueOf(nextRealtimeTimestamp());
        }
        Object[] objArr = new Object[3];
        objArr[0] = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(this.lookbackStartTimeMs);
        objArr[1] = l == null ? "real-time" : DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.printer().print(longValue);
        objArr[2] = TimeValue.timeValueMillis(this.frequencyMs).getStringRep();
        String message = Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STARTED_FROM_TO, objArr);
        this.auditor.info(this.jobId, message);
        LOGGER.info("[{}] {}", this.jobId, message);
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setCalcInterim(true);
        run(this.lookbackStartTimeMs, longValue, request);
        if (!isRunning() || this.isIsolated) {
            if (this.isIsolated) {
                return null;
            }
            LOGGER.debug("Lookback finished after being stopped");
            return null;
        }
        this.auditor.info(this.jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_LOOKBACK_COMPLETED));
        LOGGER.info("[{}] Lookback has finished", this.jobId);
        if (isPresent) {
            return null;
        }
        this.auditor.info(this.jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_CONTINUED_REALTIME));
        return Long.valueOf(nextRealtimeTimestamp());
    }

    private long skipToStartTime(long j) {
        if (this.lastEndTimeMs == null) {
            return j;
        }
        if (this.lastEndTimeMs.longValue() + 1 > j) {
            return this.lastEndTimeMs.longValue() + 1;
        }
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setSkipTime(String.valueOf(j));
        FlushJobAction.Response flushJob = flushJob(request);
        LOGGER.info("Skipped to time [" + flushJob.getLastFinalizedBucketEnd().getTime() + "]");
        return flushJob.getLastFinalizedBucketEnd().getTime();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long runRealtime() throws Exception {
        long max = this.lastEndTimeMs == null ? this.lookbackStartTimeMs : Math.max(this.lookbackStartTimeMs, this.lastEndTimeMs.longValue() + 1);
        long intervalStartEpochMs = toIntervalStartEpochMs(this.currentTimeSupplier.get().longValue() - this.queryDelayMs);
        FlushJobAction.Request request = new FlushJobAction.Request(this.jobId);
        request.setCalcInterim(true);
        request.setAdvanceTime(String.valueOf(intervalStartEpochMs));
        run(max, intervalStartEpochMs, request);
        return nextRealtimeTimestamp();
    }

    public boolean stop() {
        return this.running.compareAndSet(true, false);
    }

    public boolean isRunning() {
        return this.running.get();
    }

    private void run(long j, long j2, FlushJobAction.Request request) throws IOException {
        if (j2 <= j) {
            return;
        }
        LOGGER.trace("[{}] Searching data in: [{}, {})", this.jobId, Long.valueOf(j), Long.valueOf(j2));
        AnalysisProblemException analysisProblemException = null;
        long j3 = 0;
        DataExtractor newExtractor = this.dataExtractorFactory.newExtractor(j, j2);
        while (newExtractor.hasNext()) {
            if ((this.isIsolated || !isRunning()) && !newExtractor.isCancelled()) {
                newExtractor.cancel();
            }
            if (this.isIsolated) {
                return;
            }
            try {
                Optional<InputStream> next = newExtractor.next();
                if (this.isIsolated) {
                    return;
                }
                if (next.isPresent()) {
                    try {
                        InputStream inputStream = next.get();
                        Throwable th = null;
                        try {
                            try {
                                DataCounts postData = postData(inputStream, XContentType.JSON);
                                LOGGER.trace("[{}] Processed another {} records", this.jobId, Long.valueOf(postData.getProcessedRecordCount()));
                                if (inputStream != null) {
                                    if (0 != 0) {
                                        try {
                                            inputStream.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        inputStream.close();
                                    }
                                }
                                j3 += postData.getProcessedRecordCount();
                                if (postData.getLatestRecordTimeStamp() != null) {
                                    this.lastEndTimeMs = Long.valueOf(postData.getLatestRecordTimeStamp().getTime());
                                }
                            } finally {
                            }
                        } finally {
                        }
                    } catch (Exception e) {
                        if (e instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        if (this.isIsolated) {
                            return;
                        }
                        LOGGER.debug("[" + this.jobId + "] error while posting data", (Throwable) e);
                        analysisProblemException = new AnalysisProblemException(nextRealtimeTimestamp(), isConflictException(e), e);
                    }
                }
            } catch (Exception e2) {
                LOGGER.debug("[" + this.jobId + "] error while extracting data", (Throwable) e2);
                if (!e2.toString().contains("doc values")) {
                    throw new ExtractionProblemException(nextRealtimeTimestamp(), e2);
                }
                throw new ExtractionProblemException(nextRealtimeTimestamp(), new IllegalArgumentException("One or more fields do not have doc values; please enable doc values for all analysis fields for datafeeds using aggregations"));
            }
        }
        this.lastEndTimeMs = Long.valueOf(Math.max(this.lastEndTimeMs == null ? 0L : this.lastEndTimeMs.longValue(), j2 - 1));
        LOGGER.debug("[{}] Complete iterating data extractor [{}], [{}], [{}], [{}], [{}]", this.jobId, analysisProblemException, Long.valueOf(j3), this.lastEndTimeMs, Boolean.valueOf(isRunning()), Boolean.valueOf(newExtractor.isCancelled()));
        if (analysisProblemException != null) {
            throw analysisProblemException;
        }
        if (isRunning() && !this.isIsolated) {
            flushJob(request);
        }
        if (j3 == 0) {
            throw new EmptyDataCountException(nextRealtimeTimestamp());
        }
    }

    private DataCounts postData(InputStream inputStream, XContentType xContentType) throws IOException {
        PostDataAction.Request request = new PostDataAction.Request(this.jobId);
        request.setDataDescription(this.dataDescription);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Streams.copy(inputStream, byteArrayOutputStream);
        request.setContent(new BytesArray(byteArrayOutputStream.toByteArray()), xContentType);
        ThreadContext.StoredContext stashWithOrigin = ClientHelper.stashWithOrigin(this.client.threadPool().getThreadContext(), "ml");
        Throwable th = null;
        try {
            try {
                DataCounts dataCounts = ((PostDataAction.Response) this.client.execute(PostDataAction.INSTANCE, request).actionGet()).getDataCounts();
                if (stashWithOrigin != null) {
                    if (0 != 0) {
                        try {
                            stashWithOrigin.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        stashWithOrigin.close();
                    }
                }
                return dataCounts;
            } finally {
            }
        } catch (Throwable th3) {
            if (stashWithOrigin != null) {
                if (th != null) {
                    try {
                        stashWithOrigin.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    stashWithOrigin.close();
                }
            }
            throw th3;
        }
    }

    private boolean isConflictException(Exception exc) {
        return (exc instanceof ElasticsearchStatusException) && ((ElasticsearchStatusException) exc).status() == RestStatus.CONFLICT;
    }

    private long nextRealtimeTimestamp() {
        return toIntervalStartEpochMs(this.currentTimeSupplier.get().longValue() + this.frequencyMs) + this.queryDelayMs + 100;
    }

    private long toIntervalStartEpochMs(long j) {
        return (j / this.frequencyMs) * this.frequencyMs;
    }

    private FlushJobAction.Response flushJob(FlushJobAction.Request request) {
        try {
            LOGGER.trace("[" + this.jobId + "] Sending flush request");
            ThreadContext.StoredContext stashWithOrigin = ClientHelper.stashWithOrigin(this.client.threadPool().getThreadContext(), "ml");
            Throwable th = null;
            try {
                try {
                    FlushJobAction.Response response = (FlushJobAction.Response) this.client.execute(FlushJobAction.INSTANCE, request).actionGet();
                    if (stashWithOrigin != null) {
                        if (0 != 0) {
                            try {
                                stashWithOrigin.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            stashWithOrigin.close();
                        }
                    }
                    return response;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            LOGGER.debug("[" + this.jobId + "] error while flushing job", (Throwable) e);
            throw new AnalysisProblemException(nextRealtimeTimestamp(), isConflictException(e), e);
        }
    }

    Long lastEndTimeMs() {
        return this.lastEndTimeMs;
    }
}
