/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.dispatch;

import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;
import com.yahoo.collections.ListMap;
import com.yahoo.component.AbstractComponent;
import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.data.access.slime.SlimeAdapter;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.Client;
import com.yahoo.search.dispatch.RpcClient;
import com.yahoo.search.dispatch.SearchCluster;
import com.yahoo.search.query.SessionId;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.slime.BinaryFormat;
import com.yahoo.slime.Cursor;
import com.yahoo.slime.Inspector;
import com.yahoo.slime.Slime;
import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

@Beta
public class Dispatcher
extends AbstractComponent {
    private static final Logger log = Logger.getLogger(Dispatcher.class.getName());
    private final Client client;
    private final SearchCluster searchCluster;
    private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections;
    private final Compressor compressor = new Compressor();

    public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) {
        this.client = new RpcClient();
        this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus);
        ImmutableMap.Builder nodeConnectionsBuilder = new ImmutableMap.Builder();
        for (DispatchConfig.Node node : dispatchConfig.node()) {
            nodeConnectionsBuilder.put((Object)node.key(), (Object)this.client.createConnection(node.host(), node.port()));
        }
        this.nodeConnections = nodeConnectionsBuilder.build();
    }

    public Dispatcher(Map<Integer, Client.NodeConnection> nodeConnections, Client client) {
        this.searchCluster = null;
        this.nodeConnections = ImmutableMap.copyOf(nodeConnections);
        this.client = client;
    }

    public SearchCluster searchCluster() {
        return this.searchCluster;
    }

    public void fill(Result result, String summaryClass, CompressionType compression) {
        try {
            ListMap<Integer, FastHit> hitsByNode = Dispatcher.hitsByNode(result);
            GetDocsumsResponseReceiver responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), this.compressor, result);
            for (Map.Entry nodeHits : hitsByNode.entrySet()) {
                this.sendGetDocsumsRequest((Integer)nodeHits.getKey(), (List)nodeHits.getValue(), summaryClass, compression, result, responseReceiver);
            }
            responseReceiver.processResponses(result.getQuery());
        }
        catch (TimeoutException e) {
            result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
        }
    }

    private static ListMap<Integer, FastHit> hitsByNode(Result result) {
        ListMap hitsByPartition = new ListMap();
        Iterator<Hit> i = result.hits().unorderedDeepIterator();
        while (i.hasNext()) {
            Hit h = i.next();
            if (!(h instanceof FastHit)) continue;
            FastHit hit = (FastHit)h;
            hitsByPartition.put((Object)hit.getDistributionKey(), (Object)hit);
        }
        return hitsByPartition;
    }

    private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, CompressionType compression, Result result, GetDocsumsResponseReceiver responseReceiver) {
        Client.NodeConnection node = (Client.NodeConnection)this.nodeConnections.get((Object)nodeId);
        if (node == null) {
            result.hits().addError(ErrorMessage.createEmptyDocsums("Could not fill hits from unknown node " + nodeId));
            log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config");
            return;
        }
        Query query = result.getQuery();
        String rankProfile = query.getRanking().getProfile();
        byte[] serializedSlime = BinaryFormat.encode((Slime)Dispatcher.toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(false), hits));
        double timeoutSeconds = ((double)query.getTimeLeft() - 3.0) / 1000.0;
        Compressor.Compression compressionResult = this.compressor.compress(compression, serializedSlime);
        this.client.getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(), responseReceiver, timeoutSeconds);
    }

    private static Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List<FastHit> hits) {
        Slime slime = new Slime();
        Cursor root = slime.setObject();
        if (summaryClass != null) {
            root.setString("class", summaryClass);
        }
        if (sessionId != null) {
            root.setData("sessionid", sessionId.asUtf8String().getBytes());
        }
        if (docType != null) {
            root.setString("doctype", docType);
        }
        if (rankProfile != null) {
            root.setString("ranking", rankProfile);
        }
        Cursor gids = root.setArray("gids");
        for (FastHit hit : hits) {
            gids.addData(hit.getGlobalId().getRawId());
        }
        return slime;
    }

    public void deconstruct() {
        for (Client.NodeConnection nodeConnection : this.nodeConnections.values()) {
            nodeConnection.close();
        }
    }

    public static class GetDocsumsResponseReceiver {
        private final BlockingQueue<Client.GetDocsumsResponseOrError> responses;
        private final Compressor compressor;
        private final Result result;
        private boolean hasReportedError = false;
        private int outstandingResponses;

        public GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) {
            this.compressor = compressor;
            this.responses = new LinkedBlockingQueue<Client.GetDocsumsResponseOrError>(requestCount);
            this.outstandingResponses = requestCount;
            this.result = result;
        }

        public void receive(Client.GetDocsumsResponseOrError response) {
            this.responses.add(response);
        }

        private void throwTimeout() throws TimeoutException {
            throw new TimeoutException("Timed out waiting for summary data. " + this.outstandingResponses + " responses outstanding.");
        }

        public void processResponses(Query query) throws TimeoutException {
            try {
                while (this.outstandingResponses > 0) {
                    Client.GetDocsumsResponseOrError response;
                    long timeLeftMs = query.getTimeLeft();
                    if (timeLeftMs <= 0L) {
                        this.throwTimeout();
                    }
                    if ((response = this.responses.poll(timeLeftMs, TimeUnit.MILLISECONDS)) == null) {
                        this.throwTimeout();
                    }
                    this.processResponse(response);
                    --this.outstandingResponses;
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        private void processResponse(Client.GetDocsumsResponseOrError responseOrError) {
            if (responseOrError.error().isPresent()) {
                if (this.hasReportedError) {
                    return;
                }
                String error = responseOrError.error().get();
                this.result.hits().addError(ErrorMessage.createBackendCommunicationError(error));
                log.log(Level.WARNING, "Error fetching summary data: " + error);
            } else {
                Client.GetDocsumsResponse response = responseOrError.response().get();
                CompressionType compression = CompressionType.valueOf((byte)response.compression());
                byte[] slimeBytes = this.compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize());
                this.fill(response.hitsContext(), slimeBytes);
            }
        }

        private void fill(List<FastHit> hits, byte[] slimeBytes) {
            SlimeAdapter summaries = new SlimeAdapter((Inspector)BinaryFormat.decode((byte[])slimeBytes).get().field("docsums"));
            if (!summaries.valid()) {
                throw new IllegalArgumentException("Expected a Slime root object containing a 'docsums' field");
            }
            for (int i = 0; i < hits.size(); ++i) {
                this.fill(hits.get(i), summaries.entry(i).field("docsum"));
            }
        }

        private void fill(FastHit hit, com.yahoo.data.access.Inspector summary) {
            hit.reserve(summary.fieldCount());
            summary.traverse((name, value) -> hit.setField(name, this.nativeTypeOf(value)));
        }

        private Object nativeTypeOf(com.yahoo.data.access.Inspector inspector) {
            switch (inspector.type()) {
                case ARRAY: {
                    return inspector;
                }
                case OBJECT: {
                    return inspector;
                }
                case BOOL: {
                    return inspector.asBool();
                }
                case DATA: {
                    return inspector.asData();
                }
                case DOUBLE: {
                    return inspector.asDouble();
                }
                case LONG: {
                    return inspector.asLong();
                }
                case STRING: {
                    return inspector.asString();
                }
                case EMPTY: {
                    return null;
                }
            }
            throw new IllegalArgumentException("Unexpected Slime type " + inspector.type());
        }
    }
}

