package org.elasticsearch.xpack.ml.datafeed.extractor.chunked;

import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;

/* loaded from: input_file:lib/org.elasticsearch.plugin.xpack.api-6.1.3.jar:org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor.class */
public class ChunkedDataExtractor implements DataExtractor {
    private static final Logger LOGGER = Loggers.getLogger((Class<?>) ChunkedDataExtractor.class);
    private static final String EARLIEST_TIME = "earliest_time";
    private static final String LATEST_TIME = "latest_time";
    private static final long MIN_CHUNK_SPAN = 60000;
    private final Client client;
    private final DataExtractorFactory dataExtractorFactory;
    private final ChunkedDataExtractorContext context;
    private long currentStart;
    private long currentEnd;
    private long chunkSpan;
    private boolean isCancelled = false;
    private DataExtractor currentExtractor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/org.elasticsearch.plugin.xpack.api-6.1.3.jar:org/elasticsearch/xpack/ml/datafeed/extractor/chunked/ChunkedDataExtractor$DataSummary.class */
    public class DataSummary {
        private long earliestTime;
        private long latestTime;
        private long totalHits;

        private DataSummary(long j, long j2, long j3) {
            this.earliestTime = j;
            this.latestTime = j2;
            this.totalHits = j3;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getDataTimeSpread() {
            return this.latestTime - this.earliestTime;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long estimateChunk() {
            return (this.totalHits <= 0 || getDataTimeSpread() <= 0) ? ChunkedDataExtractor.this.context.end - ChunkedDataExtractor.this.currentEnd : Math.max((10 * (ChunkedDataExtractor.this.context.scrollSize * getDataTimeSpread())) / this.totalHits, 60000L);
        }
    }

    public ChunkedDataExtractor(Client client, DataExtractorFactory dataExtractorFactory, ChunkedDataExtractorContext chunkedDataExtractorContext) {
        this.client = (Client) Objects.requireNonNull(client);
        this.dataExtractorFactory = (DataExtractorFactory) Objects.requireNonNull(dataExtractorFactory);
        this.context = (ChunkedDataExtractorContext) Objects.requireNonNull(chunkedDataExtractorContext);
        this.currentStart = chunkedDataExtractorContext.start;
        this.currentEnd = chunkedDataExtractorContext.start;
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor
    public boolean hasNext() {
        boolean z = this.currentExtractor != null && this.currentExtractor.hasNext();
        return isCancelled() ? z : z || this.currentEnd < this.context.end;
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor
    public Optional<InputStream> next() throws IOException {
        if (!hasNext()) {
            throw new NoSuchElementException();
        }
        if (this.currentExtractor == null) {
            setUpChunkedSearch();
        }
        return getNextStream();
    }

    private void setUpChunkedSearch() throws IOException {
        DataSummary requestDataSummary = requestDataSummary();
        if (requestDataSummary.totalHits <= 0) {
            this.currentEnd = this.context.end;
            return;
        }
        this.currentStart = this.context.timeAligner.alignToFloor(requestDataSummary.earliestTime);
        this.currentEnd = this.currentStart;
        this.chunkSpan = this.context.chunkSpan == null ? requestDataSummary.estimateChunk() : this.context.chunkSpan.getMillis();
        this.chunkSpan = this.context.timeAligner.alignToCeil(this.chunkSpan);
        LOGGER.debug("Chunked search configured:  totalHits = {}, dataTimeSpread = {} ms, chunk span = {} ms", Long.valueOf(requestDataSummary.totalHits), Long.valueOf(requestDataSummary.getDataTimeSpread()), Long.valueOf(this.chunkSpan));
    }

    private DataSummary requestDataSummary() throws IOException {
        SearchResponse executeSearchRequest = executeSearchRequest(SearchAction.INSTANCE.newRequestBuilder((ElasticsearchClient) this.client).setSize(0).setIndices(this.context.indices).setTypes(this.context.types).setQuery(ExtractorUtils.wrapInTimeRangeQuery(this.context.query, this.context.timeField, this.currentStart, this.context.end)).addAggregation(AggregationBuilders.min(EARLIEST_TIME).field(this.context.timeField)).addAggregation(AggregationBuilders.max(LATEST_TIME).field(this.context.timeField)));
        ExtractorUtils.checkSearchWasSuccessful(this.context.jobId, executeSearchRequest);
        Aggregations aggregations = executeSearchRequest.getAggregations();
        long j = 0;
        long j2 = 0;
        long totalHits = executeSearchRequest.getHits().getTotalHits();
        if (totalHits > 0) {
            j = (long) ((Min) aggregations.get(EARLIEST_TIME)).getValue();
            j2 = (long) ((Max) aggregations.get(LATEST_TIME)).getValue();
        }
        return new DataSummary(j, j2, totalHits);
    }

    protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
        return searchRequestBuilder.get();
    }

    private Optional<InputStream> getNextStream() throws IOException {
        while (hasNext()) {
            boolean z = false;
            if (this.currentExtractor == null || !this.currentExtractor.hasNext()) {
                advanceTime();
                z = true;
            }
            Optional<InputStream> next = this.currentExtractor.next();
            if (next.isPresent()) {
                return next;
            }
            if (z && hasNext()) {
                setUpChunkedSearch();
            }
        }
        return Optional.empty();
    }

    private void advanceTime() {
        this.currentStart = this.currentEnd;
        this.currentEnd = Math.min(this.currentStart + this.chunkSpan, this.context.end);
        this.currentExtractor = this.dataExtractorFactory.newExtractor(this.currentStart, this.currentEnd);
        LOGGER.trace("advances time to [{}, {})", Long.valueOf(this.currentStart), Long.valueOf(this.currentEnd));
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor
    public boolean isCancelled() {
        return this.isCancelled;
    }

    @Override // org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor
    public void cancel() {
        if (this.currentExtractor != null) {
            this.currentExtractor.cancel();
        }
        this.isCancelled = true;
    }

    ChunkedDataExtractorContext getContext() {
        return this.context;
    }
}
