/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.dispatch.rpc;

import com.yahoo.collections.ListMap;
import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.container.protect.Error;
import com.yahoo.data.access.slime.SlimeAdapter;
import com.yahoo.prelude.Location;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.query.SessionId;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.slime.BinaryFormat;
import com.yahoo.slime.Cursor;
import com.yahoo.slime.Inspector;
import com.yahoo.slime.Slime;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class RpcFillInvoker
extends FillInvoker {
    private static final Logger log = Logger.getLogger(RpcFillInvoker.class.getName());
    private final DocumentDatabase documentDb;
    private final RpcResourcePool resourcePool;
    private GetDocsumsResponseReceiver responseReceiver;

    RpcFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb) {
        this.documentDb = documentDb;
        this.resourcePool = resourcePool;
    }

    @Override
    protected void sendFillRequest(Result result, String summaryClass) {
        ListMap<Integer, FastHit> hitsByNode = RpcFillInvoker.hitsByNode(result);
        Query query = result.getQuery();
        CompressionType compression = CompressionType.valueOf((String)query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase());
        if (query.getTraceLevel() >= 3) {
            query.trace("Sending " + hitsByNode.size() + " summary fetch RPC requests", 3);
            query.trace("RpcSlime: Not resending query during document summary fetching", 3);
        }
        this.responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), this.resourcePool.compressor(), result);
        for (Map.Entry nodeHits : hitsByNode.entrySet()) {
            this.sendGetDocsumsRequest((Integer)nodeHits.getKey(), (List)nodeHits.getValue(), summaryClass, compression, result, this.responseReceiver);
        }
    }

    @Override
    protected void getFillResults(Result result, String summaryClass) {
        try {
            this.responseReceiver.processResponses(result.getQuery(), summaryClass, this.documentDb);
            result.hits().setSorted(false);
            result.analyzeHits();
        }
        catch (TimeoutException e) {
            result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
        }
    }

    @Override
    protected void release() {
    }

    private static ListMap<Integer, FastHit> hitsByNode(Result result) {
        ListMap hitsByNode = new ListMap();
        Iterator<Hit> i = result.hits().unorderedDeepIterator();
        while (i.hasNext()) {
            Hit h = i.next();
            if (!(h instanceof FastHit)) continue;
            FastHit hit = (FastHit)h;
            hitsByNode.put((Object)hit.getDistributionKey(), (Object)hit);
        }
        return hitsByNode;
    }

    private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, CompressionType compression, Result result, GetDocsumsResponseReceiver responseReceiver) {
        Client.NodeConnection node = this.resourcePool.getConnection(nodeId);
        if (node == null) {
            String error = "Could not fill hits from unknown node " + nodeId;
            responseReceiver.receive(Client.ResponseOrError.fromError(error));
            result.hits().addError(ErrorMessage.createEmptyDocsums(error));
            log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config");
            return;
        }
        Query query = result.getQuery();
        String rankProfile = query.getRanking().getProfile();
        byte[] serializedSlime = BinaryFormat.encode((Slime)RpcFillInvoker.toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(), query.getRanking().getLocation(), hits));
        double timeoutSeconds = ((double)query.getTimeLeft() - 3.0) / 1000.0;
        Compressor.Compression compressionResult = this.resourcePool.compress(query, serializedSlime);
        node.getDocsums(hits, compressionResult.type(), serializedSlime.length, compressionResult.data(), responseReceiver, timeoutSeconds);
    }

    private static Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, Location location, List<FastHit> hits) {
        Slime slime = new Slime();
        Cursor root = slime.setObject();
        if (summaryClass != null) {
            root.setString("class", summaryClass);
        }
        if (sessionId != null) {
            root.setData("sessionid", sessionId.asUtf8String().getBytes());
        }
        if (docType != null) {
            root.setString("doctype", docType);
        }
        if (rankProfile != null) {
            root.setString("ranking", rankProfile);
        }
        if (location != null) {
            root.setString("location", location.backendString());
        }
        Cursor gids = root.setArray("gids");
        for (FastHit hit : hits) {
            gids.addData(hit.getRawGlobalId());
        }
        return slime;
    }

    public static class GetDocsumsResponseReceiver {
        private final BlockingQueue<Client.ResponseOrError<Client.GetDocsumsResponse>> responses;
        private final Compressor compressor;
        private final Result result;
        private boolean hasReportedError = false;
        private int outstandingResponses;

        GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) {
            this.compressor = compressor;
            this.responses = new LinkedBlockingQueue<Client.ResponseOrError<Client.GetDocsumsResponse>>(requestCount);
            this.outstandingResponses = requestCount;
            this.result = result;
        }

        public void receive(Client.ResponseOrError<Client.GetDocsumsResponse> response) {
            this.responses.add(response);
        }

        private void throwTimeout() throws TimeoutException {
            throw new TimeoutException("Timed out waiting for summary data. " + this.outstandingResponses + " responses outstanding.");
        }

        void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException {
            try {
                int skippedHits = 0;
                while (this.outstandingResponses > 0) {
                    Client.ResponseOrError<Client.GetDocsumsResponse> response;
                    long timeLeftMs = query.getTimeLeft();
                    if (timeLeftMs <= 0L) {
                        this.throwTimeout();
                    }
                    if ((response = this.responses.poll(timeLeftMs, TimeUnit.MILLISECONDS)) == null) {
                        this.throwTimeout();
                    }
                    skippedHits += this.processResponse(response, summaryClass, documentDb);
                    --this.outstandingResponses;
                }
                if (skippedHits != 0) {
                    this.result.hits().addError(ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + summaryClass + " for " + skippedHits + " hits"));
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        private int processResponse(Client.ResponseOrError<Client.GetDocsumsResponse> responseOrError, String summaryClass, DocumentDatabase documentDb) {
            if (responseOrError.error().isPresent()) {
                if (this.hasReportedError) {
                    return 0;
                }
            } else {
                Client.GetDocsumsResponse response = responseOrError.response().get();
                CompressionType compression = CompressionType.valueOf((byte)response.compression());
                byte[] slimeBytes = this.compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize());
                return this.fill(response.hitsContext(), summaryClass, documentDb, slimeBytes);
            }
            String error = responseOrError.error().get();
            this.result.hits().addError(ErrorMessage.createBackendCommunicationError(error));
            log.log(Level.WARNING, "Error fetching summary data: " + error);
            return 0;
        }

        private void addErrors(Inspector errors) {
            errors.traverse((index, value) -> {
                int errorCode = "timeout".equalsIgnoreCase(value.field("type").asString()) ? Error.TIMEOUT.code : Error.UNSPECIFIED.code;
                this.result.hits().addError(new ErrorMessage(errorCode, value.field("message").asString(), value.field("details").asString()));
            });
        }

        private int fill(List<FastHit> hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) {
            SlimeAdapter summaries;
            boolean hasErrors;
            Cursor root = BinaryFormat.decode((byte[])slimeBytes).get();
            Inspector errors = root.field("errors");
            boolean bl = hasErrors = errors.valid() && errors.entries() > 0;
            if (hasErrors) {
                this.addErrors(errors);
            }
            if (!(summaries = new SlimeAdapter(root.field("docsums"))).valid()) {
                return 0;
            }
            int skippedHits = 0;
            for (int i = 0; i < hits.size(); ++i) {
                com.yahoo.data.access.Inspector summary = summaries.entry(i).field("docsum");
                if (summary.valid()) {
                    hits.get(i).setField("sddocname", documentDb.getName());
                    hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary);
                    hits.get(i).setFilled(summaryClass);
                    continue;
                }
                ++skippedHits;
            }
            return skippedHits;
        }
    }
}

