/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.dispatch;

import com.yahoo.concurrent.Timer;
import com.yahoo.prelude.fastsearch.GroupingListHit;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.AdaptiveTimeoutHandler;
import com.yahoo.search.dispatch.CloseableInvoker;
import com.yahoo.search.dispatch.CoverageAggregator;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.GroupingResultAggregator;
import com.yahoo.search.dispatch.InvokerResult;
import com.yahoo.search.dispatch.LeanHit;
import com.yahoo.search.dispatch.ResponseMonitor;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.SimpleTimeoutHandler;
import com.yahoo.search.dispatch.TimeoutHandler;
import com.yahoo.search.dispatch.TopKEstimator;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.vespa.config.search.DispatchConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class InterleavedSearchInvoker
extends SearchInvoker
implements ResponseMonitor<SearchInvoker> {
    private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName());
    private final Timer timer;
    private final Set<SearchInvoker> invokers;
    private final DispatchConfig dispatchConfig;
    private final Group group;
    private final LinkedBlockingQueue<SearchInvoker> availableForProcessing;
    private final Set<Integer> alreadyFailedNodes;
    private final CoverageAggregator coverageAggregator;
    private final TopKEstimator hitEstimator;
    private Query query;
    private TimeoutHandler timeoutHandler;

    public InterleavedSearchInvoker(Timer timer, Collection<SearchInvoker> invokers, TopKEstimator hitEstimator, DispatchConfig dispatchConfig, Group group, Set<Integer> alreadyFailedNodes) {
        super(Optional.empty());
        this.timer = timer;
        this.invokers = Collections.newSetFromMap(new IdentityHashMap());
        this.invokers.addAll(invokers);
        this.dispatchConfig = dispatchConfig;
        this.group = group;
        this.availableForProcessing = this.newQueue();
        this.alreadyFailedNodes = alreadyFailedNodes;
        this.coverageAggregator = new CoverageAggregator(invokers.size());
        this.hitEstimator = hitEstimator;
    }

    private int estimateHitsToFetch(int wantedHits, int numPartitions) {
        return this.hitEstimator.estimateK(wantedHits, numPartitions);
    }

    private int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) {
        return this.hitEstimator.estimateK(wantedHits, numPartitions, topKProbability);
    }

    private TimeoutHandler createTimeoutHandler(DispatchConfig config, int askedNodes, Query query) {
        return config.minSearchCoverage() < 100.0 ? new AdaptiveTimeoutHandler(this.timer, config, askedNodes, query) : new SimpleTimeoutHandler(query);
    }

    @Override
    protected Object sendSearchRequest(Query query, Object unusedContext) throws IOException {
        int neededHits;
        this.query = query;
        this.invokers.forEach(invoker -> invoker.setMonitor(this));
        int originalHits = query.getHits();
        int originalOffset = query.getOffset();
        int q = neededHits = originalHits + originalOffset;
        if (this.group.isBalanced() && !this.group.isSparse()) {
            Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability);
            q = topkProbabilityOverrride != null ? this.estimateHitsToFetch(neededHits, this.invokers.size(), topkProbabilityOverrride) : this.estimateHitsToFetch(neededHits, this.invokers.size());
        }
        query.setHits(q);
        query.setOffset(0);
        Object context = null;
        for (SearchInvoker invoker2 : this.invokers) {
            context = invoker2.sendSearchRequest(query, context);
        }
        this.timeoutHandler = this.createTimeoutHandler(this.dispatchConfig, this.invokers.size(), query);
        query.setHits(originalHits);
        query.setOffset(originalOffset);
        return null;
    }

    @Override
    protected InvokerResult getSearchResult() throws IOException {
        InvokerResult result = new InvokerResult(this.query, this.query.getHits());
        List<LeanHit> merged = List.of();
        long nextTimeout = this.query.getTimeLeft();
        GroupingResultAggregator groupingResultAggregator = new GroupingResultAggregator();
        try {
            while (!this.invokers.isEmpty() && nextTimeout >= 0L) {
                SearchInvoker invoker = this.availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS);
                if (invoker == null) {
                    log.fine(() -> "Search timed out with " + this.coverageAggregator.getAskedNodes() + " requests made, " + this.coverageAggregator.getAnsweredNodes() + " responses received");
                    break;
                }
                InvokerResult toMerge = invoker.getSearchResult();
                merged = this.mergeResult(result.getResult(), toMerge, merged, groupingResultAggregator);
                this.ejectInvoker(invoker);
                nextTimeout = this.timeoutHandler.nextTimeoutMS(this.coverageAggregator.getAnsweredNodes());
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while waiting for search results", e);
        }
        groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add((Hit)h));
        this.insertNetworkErrors(result.getResult());
        CoverageAggregator adjusted = this.coverageAggregator.adjustedDegradedCoverage((int)this.dispatchConfig.redundancy(), this.timeoutHandler);
        result.getResult().setCoverage(adjusted.createCoverage(this.timeoutHandler));
        int needed = this.query.getOffset() + this.query.getHits();
        for (int index = this.query.getOffset(); index < merged.size() && index < needed; ++index) {
            result.getLeanHits().add(merged.get(index));
        }
        this.query.setOffset(0);
        return result;
    }

    private void insertNetworkErrors(Result result) {
        boolean asErrors = this.coverageAggregator.hasNoAnswers();
        if (!this.invokers.isEmpty()) {
            String keys = this.invokers.stream().map(SearchInvoker::distributionKey).map(dk -> dk.map(i -> i.toString()).orElse("(unspecified)")).collect(Collectors.joining(", "));
            if (asErrors) {
                result.hits().addError(ErrorMessage.createTimeout("Backend communication timeout on all nodes in group (distribution-keys: " + keys + ")"));
            } else {
                this.query.trace("Backend communication timeout on nodes with distribution-keys: " + keys, 2);
            }
            this.coverageAggregator.setTimedOut();
        }
        if (this.alreadyFailedNodes != null) {
            String message = "Connection failure on nodes with distribution-keys: " + this.alreadyFailedNodes.stream().map(v -> Integer.toString(v)).collect(Collectors.joining(", "));
            if (asErrors) {
                result.hits().addError(ErrorMessage.createBackendCommunicationError(message));
            } else {
                this.query.trace(message, 2);
            }
            this.coverageAggregator.setFailedNodes(this.alreadyFailedNodes.size());
        }
    }

    private List<LeanHit> mergeResult(Result result, InvokerResult partialResult, List<LeanHit> current, GroupingResultAggregator groupingResultAggregator) {
        this.coverageAggregator.add(partialResult.getResult().getCoverage(true));
        result.mergeWith(partialResult.getResult());
        List<Hit> partialNonLean = partialResult.getResult().hits().asUnorderedHits();
        for (Hit hit : partialNonLean) {
            if (!hit.isAuxiliary()) continue;
            if (hit instanceof GroupingListHit) {
                groupingResultAggregator.mergeWith((GroupingListHit)hit);
                continue;
            }
            result.hits().add(hit);
        }
        if (current.isEmpty()) {
            return partialResult.getLeanHits();
        }
        List<LeanHit> partial = partialResult.getLeanHits();
        if (partial.isEmpty()) {
            return current;
        }
        int needed = this.query.getOffset() + this.query.getHits();
        ArrayList<LeanHit> merged = new ArrayList<LeanHit>(needed);
        int indexCurrent = 0;
        int indexPartial = 0;
        while (indexCurrent < current.size() && indexPartial < partial.size() && merged.size() < needed) {
            LeanHit incomingHit = partial.get(indexPartial);
            LeanHit currentHit = current.get(indexCurrent);
            int cmpRes = currentHit.compareTo(incomingHit);
            if (cmpRes < 0) {
                merged.add(currentHit);
                ++indexCurrent;
                continue;
            }
            if (cmpRes > 0) {
                merged.add(incomingHit);
                ++indexPartial;
                continue;
            }
            merged.add(currentHit);
            ++indexCurrent;
            ++indexPartial;
        }
        this.appendRemainingIfNeeded(merged, needed, current, indexCurrent);
        this.appendRemainingIfNeeded(merged, needed, partial, indexPartial);
        return merged;
    }

    private void appendRemainingIfNeeded(List<LeanHit> merged, int needed, List<LeanHit> hits, int index) {
        while (index < hits.size() && merged.size() < needed) {
            merged.add(hits.get(index++));
        }
    }

    private void ejectInvoker(SearchInvoker invoker) {
        this.invokers.remove(invoker);
        invoker.release();
    }

    @Override
    protected void release() {
        if (!this.invokers.isEmpty()) {
            this.invokers.forEach(CloseableInvoker::close);
            this.invokers.clear();
        }
    }

    @Override
    public void responseAvailable(SearchInvoker from) {
        if (this.availableForProcessing != null) {
            this.availableForProcessing.add(from);
        }
    }

    @Override
    protected void setMonitor(ResponseMonitor<SearchInvoker> monitor) {
    }

    protected LinkedBlockingQueue<SearchInvoker> newQueue() {
        return new LinkedBlockingQueue<SearchInvoker>();
    }

    Collection<SearchInvoker> invokers() {
        return this.invokers;
    }
}

