/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorContext;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationToJsonProcessor;

class AggregationDataExtractor
implements DataExtractor {
    private static final Logger LOGGER = Loggers.getLogger(AggregationDataExtractor.class);
    private static int BATCH_KEY_VALUE_PAIRS = 1000;
    private final Client client;
    private final AggregationDataExtractorContext context;
    private boolean hasNext;
    private boolean isCancelled;
    private LinkedList<Histogram.Bucket> histogramBuckets;

    AggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
        this.client = Objects.requireNonNull(client);
        this.context = Objects.requireNonNull(dataExtractorContext);
        this.hasNext = true;
        this.isCancelled = false;
        this.histogramBuckets = null;
    }

    @Override
    public boolean hasNext() {
        return this.hasNext;
    }

    @Override
    public boolean isCancelled() {
        return this.isCancelled;
    }

    @Override
    public void cancel() {
        LOGGER.trace("[{}] Data extractor received cancel request", (Object)this.context.jobId);
        this.isCancelled = true;
        this.hasNext = false;
    }

    @Override
    public Optional<InputStream> next() throws IOException {
        if (!this.hasNext()) {
            throw new NoSuchElementException();
        }
        if (this.histogramBuckets == null) {
            this.histogramBuckets = this.search();
        }
        return Optional.ofNullable(this.processNextBatch());
    }

    private LinkedList<Histogram.Bucket> search() throws IOException {
        if (this.histogramBuckets != null) {
            throw new IllegalStateException("search should only be performed once");
        }
        LOGGER.debug("[{}] Executing aggregated search", (Object)this.context.jobId);
        SearchResponse searchResponse = this.executeSearchRequest(this.buildSearchRequest());
        ExtractorUtils.checkSearchWasSuccessful(this.context.jobId, searchResponse);
        return new LinkedList<Histogram.Bucket>(this.getHistogramBuckets(searchResponse.getAggregations()));
    }

    protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
        return (SearchResponse)searchRequestBuilder.get();
    }

    private SearchRequestBuilder buildSearchRequest() {
        SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder((ElasticsearchClient)this.client).setIndices(this.context.indices).setTypes(this.context.types).setSize(0).setQuery(ExtractorUtils.wrapInTimeRangeQuery(this.context.query, this.context.timeField, this.context.start, this.context.end));
        this.context.aggs.getAggregatorFactories().forEach(arg_0 -> ((SearchRequestBuilder)searchRequestBuilder).addAggregation(arg_0));
        this.context.aggs.getPipelineAggregatorFactories().forEach(arg_0 -> ((SearchRequestBuilder)searchRequestBuilder).addAggregation(arg_0));
        return searchRequestBuilder;
    }

    private List<? extends Histogram.Bucket> getHistogramBuckets(@Nullable Aggregations aggs) {
        if (aggs == null) {
            return Collections.emptyList();
        }
        List aggsAsList = aggs.asList();
        if (aggsAsList.isEmpty()) {
            return Collections.emptyList();
        }
        if (aggsAsList.size() > 1) {
            throw new IllegalArgumentException("Multiple top level aggregations not supported; found: " + aggsAsList.stream().map(Aggregation::getName).collect(Collectors.toList()));
        }
        Aggregation topAgg = (Aggregation)aggsAsList.get(0);
        if (topAgg instanceof Histogram) {
            return ((Histogram)topAgg).getBuckets();
        }
        throw new IllegalArgumentException("Top level aggregation should be [histogram]");
    }

    private InputStream processNextBatch() throws IOException {
        if (this.histogramBuckets.isEmpty()) {
            this.hasNext = false;
            return null;
        }
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(this.context.timeField, this.context.fields, this.context.includeDocCount, outputStream);){
            while (!this.histogramBuckets.isEmpty() && processor.getKeyValueCount() < (long)BATCH_KEY_VALUE_PAIRS) {
                processor.process(this.histogramBuckets.removeFirst());
            }
            if (this.histogramBuckets.isEmpty()) {
                this.hasNext = false;
            }
        }
        return new ByteArrayInputStream(outputStream.toByteArray());
    }
}

