/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.dispatch;

import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.CloseableInvoker;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.InvokerResult;
import com.yahoo.search.dispatch.LeanHit;
import com.yahoo.search.dispatch.ResponseMonitor;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.search.searchchain.Execution;
import com.yahoo.vespa.config.search.DispatchConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class InterleavedSearchInvoker
extends SearchInvoker
implements ResponseMonitor<SearchInvoker> {
    private static final Logger log = Logger.getLogger(InterleavedSearchInvoker.class.getName());
    private final Set<SearchInvoker> invokers = Collections.newSetFromMap(new IdentityHashMap());
    private final SearchCluster searchCluster;
    private final Group group;
    private final LinkedBlockingQueue<SearchInvoker> availableForProcessing;
    private final Set<Integer> alreadyFailedNodes;
    private Query query;
    private boolean adaptiveTimeoutCalculated = false;
    private long adaptiveTimeoutMin = 0L;
    private long adaptiveTimeoutMax = 0L;
    private long deadline = 0L;
    private long answeredDocs = 0L;
    private long answeredActiveDocs = 0L;
    private long answeredSoonActiveDocs = 0L;
    private int askedNodes = 0;
    private int answeredNodes = 0;
    private int answeredNodesParticipated = 0;
    private boolean timedOut = false;
    private boolean degradedByMatchPhase = false;

    public InterleavedSearchInvoker(Collection<SearchInvoker> invokers, SearchCluster searchCluster, Group group, Set<Integer> alreadyFailedNodes) {
        super(Optional.empty());
        this.invokers.addAll(invokers);
        this.searchCluster = searchCluster;
        this.group = group;
        this.availableForProcessing = this.newQueue();
        this.alreadyFailedNodes = alreadyFailedNodes;
    }

    @Override
    protected Object sendSearchRequest(Query query, Object unusedContext) throws IOException {
        int neededHits;
        this.query = query;
        this.invokers.forEach(invoker -> invoker.setMonitor(this));
        this.deadline = this.currentTime() + query.getTimeLeft();
        int originalHits = query.getHits();
        int originalOffset = query.getOffset();
        int q = neededHits = originalHits + originalOffset;
        if (this.group.isBalanced() && !this.group.isSparse()) {
            Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability);
            q = topkProbabilityOverrride != null ? this.searchCluster.estimateHitsToFetch(neededHits, this.invokers.size(), topkProbabilityOverrride) : this.searchCluster.estimateHitsToFetch(neededHits, this.invokers.size());
        }
        query.setHits(q);
        query.setOffset(0);
        Object context = null;
        for (SearchInvoker invoker2 : this.invokers) {
            context = invoker2.sendSearchRequest(query, context);
            ++this.askedNodes;
        }
        query.setHits(originalHits);
        query.setOffset(originalOffset);
        return null;
    }

    @Override
    protected InvokerResult getSearchResult(Execution execution) throws IOException {
        InvokerResult result = new InvokerResult(this.query, this.query.getHits());
        List<LeanHit> merged = Collections.emptyList();
        long nextTimeout = this.query.getTimeLeft();
        boolean extraDebug = this.query.getOffset() == 0 && this.query.getHits() == 7 && log.isLoggable(Level.FINE);
        ArrayList<InvokerResult> processed = new ArrayList<InvokerResult>();
        try {
            while (!this.invokers.isEmpty() && nextTimeout >= 0L) {
                SearchInvoker invoker = this.availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS);
                if (invoker == null) {
                    log.fine(() -> "Search timed out with " + this.askedNodes + " requests made, " + this.answeredNodes + " responses received");
                    break;
                }
                InvokerResult toMerge = invoker.getSearchResult(execution);
                if (extraDebug) {
                    processed.add(toMerge);
                }
                merged = this.mergeResult(result.getResult(), toMerge, merged);
                this.ejectInvoker(invoker);
                nextTimeout = this.nextTimeout();
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted while waiting for search results", e);
        }
        this.insertNetworkErrors(result.getResult());
        result.getResult().setCoverage(this.createCoverage());
        if (extraDebug && merged.size() > 0) {
            int firstPartId = ((LeanHit)merged.get(0)).getPartId();
            for (int index = 1; index < merged.size(); ++index) {
                if (merged.get(index).getPartId() == firstPartId) continue;
                extraDebug = false;
                log.fine("merged[" + index + "/" + merged.size() + "] from partId " + merged.get(index).getPartId() + ", first " + firstPartId);
                break;
            }
        }
        if (extraDebug) {
            log.fine("Interleaved " + processed.size() + " results");
            for (int pIdx = 0; pIdx < processed.size(); ++pIdx) {
                InvokerResult p = (InvokerResult)processed.get(pIdx);
                log.fine("InvokerResult " + pIdx + " total hits " + p.getResult().getTotalHitCount());
                List<LeanHit> lean = p.getLeanHits();
                for (int idx = 0; idx < lean.size(); ++idx) {
                    LeanHit hit = lean.get(idx);
                    log.fine("lean hit " + idx + " relevance " + hit.getRelevance() + " partid " + hit.getPartId());
                }
            }
            for (int mIdx = 0; mIdx < merged.size(); ++mIdx) {
                LeanHit hit = merged.get(mIdx);
                log.fine("merged hit " + mIdx + " relevance " + hit.getRelevance() + " partid " + hit.getPartId());
            }
        }
        int needed = this.query.getOffset() + this.query.getHits();
        for (int index = this.query.getOffset(); index < merged.size() && index < needed; ++index) {
            result.getLeanHits().add(merged.get(index));
        }
        this.query.setOffset(0);
        return result;
    }

    private void insertNetworkErrors(Result result) {
        boolean asErrors;
        boolean bl = asErrors = this.answeredNodes == 0;
        if (!this.invokers.isEmpty()) {
            String keys = this.invokers.stream().map(SearchInvoker::distributionKey).map(dk -> dk.map(i -> i.toString()).orElse("(unspecified)")).collect(Collectors.joining(", "));
            if (asErrors) {
                result.hits().addError(ErrorMessage.createTimeout("Backend communication timeout on all nodes in group (distribution-keys: " + keys + ")"));
            } else {
                this.query.trace("Backend communication timeout on nodes with distribution-keys: " + keys, 2);
            }
            this.timedOut = true;
        }
        if (this.alreadyFailedNodes != null) {
            String message = "Connection failure on nodes with distribution-keys: " + this.alreadyFailedNodes.stream().map(v -> Integer.toString(v)).collect(Collectors.joining(", "));
            if (asErrors) {
                result.hits().addError(ErrorMessage.createBackendCommunicationError(message));
            } else {
                this.query.trace(message, 2);
            }
            int failed = this.alreadyFailedNodes.size();
            this.askedNodes += failed;
            this.answeredNodes += failed;
        }
    }

    private long nextTimeout() {
        long nextAdaptive;
        DispatchConfig config = this.searchCluster.dispatchConfig();
        double minimumCoverage = config.minSearchCoverage();
        if (this.askedNodes == this.answeredNodes || minimumCoverage >= 100.0) {
            return this.query.getTimeLeft();
        }
        int minimumResponses = (int)Math.ceil((double)this.askedNodes * minimumCoverage / 100.0);
        if (this.answeredNodes < minimumResponses) {
            return this.query.getTimeLeft();
        }
        long timeLeft = this.query.getTimeLeft();
        if (!this.adaptiveTimeoutCalculated) {
            this.adaptiveTimeoutMin = (long)((double)timeLeft * config.minWaitAfterCoverageFactor());
            this.adaptiveTimeoutMax = (long)((double)timeLeft * config.maxWaitAfterCoverageFactor());
            this.adaptiveTimeoutCalculated = true;
        }
        long now = this.currentTime();
        int pendingQueries = this.askedNodes - this.answeredNodes;
        double missWidth = (100.0 - config.minSearchCoverage()) * (double)this.askedNodes / 100.0 - 1.0;
        double slopedWait = this.adaptiveTimeoutMin;
        if (pendingQueries > 1 && missWidth > 0.0) {
            slopedWait += (double)((this.adaptiveTimeoutMax - this.adaptiveTimeoutMin) * (long)(pendingQueries - 1)) / missWidth;
        }
        if (now + (nextAdaptive = (long)slopedWait) >= this.deadline) {
            return this.deadline - now;
        }
        this.deadline = now + nextAdaptive;
        return nextAdaptive;
    }

    private List<LeanHit> mergeResult(Result result, InvokerResult partialResult, List<LeanHit> current) {
        LeanHit incomingHit;
        this.collectCoverage(partialResult.getResult().getCoverage(true));
        result.mergeWith(partialResult.getResult());
        List<Hit> partialNonLean = partialResult.getResult().hits().asUnorderedHits();
        for (Hit hit : partialNonLean) {
            if (!hit.isAuxiliary()) continue;
            result.hits().add(hit);
        }
        if (current.isEmpty()) {
            return partialResult.getLeanHits();
        }
        List<LeanHit> partial = partialResult.getLeanHits();
        if (partial.isEmpty()) {
            return current;
        }
        int needed = this.query.getOffset() + this.query.getHits();
        ArrayList<LeanHit> merged = new ArrayList<LeanHit>(needed);
        int indexCurrent = 0;
        int indexPartial = 0;
        while (indexCurrent < current.size() && indexPartial < partial.size() && merged.size() < needed) {
            incomingHit = partial.get(indexPartial);
            LeanHit currentHit = current.get(indexCurrent);
            int cmpRes = currentHit.compareTo(incomingHit);
            if (cmpRes < 0) {
                merged.add(currentHit);
                ++indexCurrent;
                continue;
            }
            if (cmpRes > 0) {
                merged.add(incomingHit);
                ++indexPartial;
                continue;
            }
            merged.add(currentHit);
            ++indexCurrent;
            ++indexPartial;
        }
        while (indexCurrent < current.size() && merged.size() < needed) {
            LeanHit currentHit = current.get(indexCurrent++);
            merged.add(currentHit);
        }
        while (indexPartial < partial.size() && merged.size() < needed) {
            incomingHit = partial.get(indexPartial++);
            merged.add(incomingHit);
        }
        return merged;
    }

    private void collectCoverage(Coverage source) {
        this.answeredDocs += source.getDocs();
        this.answeredActiveDocs += source.getActive();
        this.answeredSoonActiveDocs += source.getSoonActive();
        this.answeredNodesParticipated += source.getNodes();
        ++this.answeredNodes;
        this.degradedByMatchPhase |= source.isDegradedByMatchPhase();
        this.timedOut |= source.isDegradedByTimeout();
    }

    private Coverage createCoverage() {
        this.adjustDegradedCoverage();
        Coverage coverage = new Coverage(this.answeredDocs, this.answeredActiveDocs, this.answeredNodesParticipated, 1);
        coverage.setNodesTried(this.askedNodes);
        coverage.setSoonActive(this.answeredSoonActiveDocs);
        int degradedReason = 0;
        if (this.timedOut) {
            degradedReason |= this.adaptiveTimeoutCalculated ? 4 : 2;
        }
        if (this.degradedByMatchPhase) {
            degradedReason |= 1;
        }
        coverage.setDegradedReason(degradedReason);
        return coverage;
    }

    private void adjustDegradedCoverage() {
        if (this.askedNodes == this.answeredNodesParticipated) {
            return;
        }
        int notAnswered = this.askedNodes - this.answeredNodesParticipated;
        if (this.adaptiveTimeoutCalculated && this.answeredNodesParticipated > 0) {
            this.answeredActiveDocs += (long)notAnswered * this.answeredActiveDocs / (long)this.answeredNodesParticipated;
            this.answeredSoonActiveDocs += (long)notAnswered * this.answeredSoonActiveDocs / (long)this.answeredNodesParticipated;
        } else if (this.askedNodes > this.answeredNodesParticipated) {
            int searchableCopies = (int)this.searchCluster.dispatchConfig().searchableCopies();
            int missingNodes = notAnswered - (searchableCopies - 1);
            if (this.answeredNodesParticipated > 0) {
                this.answeredActiveDocs += (long)missingNodes * this.answeredActiveDocs / (long)this.answeredNodesParticipated;
                this.answeredSoonActiveDocs += (long)missingNodes * this.answeredSoonActiveDocs / (long)this.answeredNodesParticipated;
                this.timedOut = true;
            }
        }
    }

    private void ejectInvoker(SearchInvoker invoker) {
        this.invokers.remove(invoker);
        invoker.release();
    }

    @Override
    protected void release() {
        if (!this.invokers.isEmpty()) {
            this.invokers.forEach(CloseableInvoker::close);
            this.invokers.clear();
        }
    }

    @Override
    public void responseAvailable(SearchInvoker from) {
        if (this.availableForProcessing != null) {
            this.availableForProcessing.add(from);
        }
    }

    @Override
    protected void setMonitor(ResponseMonitor<SearchInvoker> monitor) {
    }

    protected long currentTime() {
        return System.currentTimeMillis();
    }

    protected LinkedBlockingQueue<SearchInvoker> newQueue() {
        return new LinkedBlockingQueue<SearchInvoker>();
    }

    Collection<SearchInvoker> invokers() {
        return this.invokers;
    }
}

