/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.xpack.ml.datafeed.extractor.scroll;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorContext;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.SearchHitToJsonProcessor;
import org.elasticsearch.xpack.ml.utils.DomainSplitFunction;

class ScrollDataExtractor
implements DataExtractor {
    private static final Logger LOGGER = Loggers.getLogger(ScrollDataExtractor.class);
    private static final TimeValue SCROLL_TIMEOUT = new TimeValue(10L, TimeUnit.MINUTES);
    private final Client client;
    private final ScrollDataExtractorContext context;
    private String scrollId;
    private boolean isCancelled;
    private boolean hasNext;
    private Long timestampOnCancel;

    ScrollDataExtractor(Client client, ScrollDataExtractorContext dataExtractorContext) {
        this.client = Objects.requireNonNull(client);
        this.context = Objects.requireNonNull(dataExtractorContext);
        this.hasNext = true;
    }

    @Override
    public boolean hasNext() {
        return this.hasNext;
    }

    @Override
    public boolean isCancelled() {
        return this.isCancelled;
    }

    @Override
    public void cancel() {
        LOGGER.trace("[{}] Data extractor received cancel request", (Object)this.context.jobId);
        this.isCancelled = true;
    }

    @Override
    public Optional<InputStream> next() throws IOException {
        Optional<InputStream> stream;
        if (!this.hasNext()) {
            throw new NoSuchElementException();
        }
        Optional<InputStream> optional = stream = this.scrollId == null ? Optional.ofNullable(this.initScroll()) : Optional.ofNullable(this.continueScroll());
        if (!stream.isPresent()) {
            this.hasNext = false;
        }
        return stream;
    }

    private InputStream initScroll() throws IOException {
        LOGGER.debug("[{}] Initializing scroll", (Object)this.context.jobId);
        SearchResponse searchResponse = this.executeSearchRequest(this.buildSearchRequest());
        return this.processSearchResponse(searchResponse);
    }

    protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
        return (SearchResponse)searchRequestBuilder.get();
    }

    private SearchRequestBuilder buildSearchRequest() {
        SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder((ElasticsearchClient)this.client).setScroll(SCROLL_TIMEOUT).addSort(this.context.extractedFields.timeField(), SortOrder.ASC).setIndices(this.context.indexes).setTypes(this.context.types).setSize(this.context.scrollSize).setQuery(ExtractorUtils.wrapInTimeRangeQuery(this.context.query, this.context.extractedFields.timeField(), this.context.start, this.context.end));
        for (String docValueField : this.context.extractedFields.getDocValueFields()) {
            searchRequestBuilder.addDocValueField(docValueField);
        }
        String[] sourceFields = this.context.extractedFields.getSourceFields();
        if (sourceFields.length == 0) {
            searchRequestBuilder.setFetchSource(false);
            searchRequestBuilder.storedFields(new String[]{"_none_"});
        } else {
            searchRequestBuilder.setFetchSource(sourceFields, null);
        }
        this.context.scriptFields.forEach(f -> searchRequestBuilder.addScriptField(f.fieldName(), this.injectDomainSplit(f.script())));
        return searchRequestBuilder;
    }

    private Script injectDomainSplit(Script script) {
        String code = script.getIdOrCode();
        if (code.contains("domainSplit(") && script.getLang().equals("painless")) {
            String modifiedCode = DomainSplitFunction.function + code;
            HashMap<String, Object> modifiedParams = new HashMap<String, Object>(script.getParams().size() + DomainSplitFunction.params.size());
            modifiedParams.putAll(script.getParams());
            modifiedParams.putAll(DomainSplitFunction.params);
            return new Script(script.getType(), script.getLang(), modifiedCode, modifiedParams);
        }
        return script;
    }

    private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
        ExtractorUtils.checkSearchWasSuccessful(this.context.jobId, searchResponse);
        this.scrollId = searchResponse.getScrollId();
        if (searchResponse.getHits().hits().length == 0) {
            this.hasNext = false;
            this.clearScroll(this.scrollId);
            return null;
        }
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(this.context.extractedFields, outputStream);){
            for (SearchHit hit : searchResponse.getHits().hits()) {
                Long timestamp;
                if (this.isCancelled && (timestamp = this.context.extractedFields.timeFieldValue(hit)) != null) {
                    if (this.timestampOnCancel == null) {
                        this.timestampOnCancel = timestamp;
                    } else if (!timestamp.equals(this.timestampOnCancel)) {
                        this.hasNext = false;
                        this.clearScroll(this.scrollId);
                        break;
                    }
                }
                hitProcessor.process(hit);
            }
        }
        return new ByteArrayInputStream(outputStream.toByteArray());
    }

    private InputStream continueScroll() throws IOException {
        LOGGER.debug("[{}] Continuing scroll with id [{}]", (Object)this.context.jobId, (Object)this.scrollId);
        SearchResponse searchResponse = this.executeSearchScrollRequest(this.scrollId);
        return this.processSearchResponse(searchResponse);
    }

    protected SearchResponse executeSearchScrollRequest(String scrollId) {
        return (SearchResponse)SearchScrollAction.INSTANCE.newRequestBuilder((ElasticsearchClient)this.client).setScroll(SCROLL_TIMEOUT).setScrollId(scrollId).get();
    }

    void clearScroll(String scrollId) {
        ClearScrollAction.INSTANCE.newRequestBuilder((ElasticsearchClient)this.client).addScrollId(scrollId).get();
    }
}

