/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.dispatch.rpc;

import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.dispatch.InvokerResult;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.rpc.Client;
import com.yahoo.search.dispatch.rpc.ProtobufSerialization;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.searchchain.Execution;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class RpcSearchInvoker
extends SearchInvoker
implements Client.ResponseReceiver {
    private static final String RPC_METHOD = "vespa.searchprotocol.search";
    private final VespaBackEndSearcher searcher;
    private final Node node;
    private final RpcResourcePool resourcePool;
    private final BlockingQueue<Client.ResponseOrError<Client.ProtobufResponse>> responses;
    private final int maxHits;
    private Query query;

    RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool, int maxHits) {
        super(Optional.of(node));
        this.searcher = searcher;
        this.node = node;
        this.resourcePool = resourcePool;
        this.responses = new LinkedBlockingQueue<Client.ResponseOrError<Client.ProtobufResponse>>(1);
        this.maxHits = maxHits;
    }

    @Override
    protected Object sendSearchRequest(Query query, Object incomingContext) {
        this.query = query;
        Client.NodeConnection nodeConnection = this.resourcePool.getConnection(this.node.key());
        if (nodeConnection == null) {
            this.responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + this.node.key()));
            this.responseAvailable();
            return incomingContext;
        }
        query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", this.node.key());
        RpcContext context = this.getContext(incomingContext);
        double timeoutSeconds = ((double)query.getTimeLeft() - 3.0) / 1000.0;
        nodeConnection.request(RPC_METHOD, context.compressedPayload.type(), context.compressedPayload.uncompressedSize(), context.compressedPayload.data(), this, timeoutSeconds);
        return context;
    }

    private RpcContext getContext(Object incomingContext) {
        if (incomingContext instanceof RpcContext) {
            return (RpcContext)incomingContext;
        }
        return new RpcContext(this.resourcePool, this.query, ProtobufSerialization.serializeSearchRequest(this.query, Math.min(this.query.getHits(), this.maxHits), this.searcher.getServerId()));
    }

    @Override
    protected InvokerResult getSearchResult(Execution execution) throws IOException {
        long timeLeftMs = this.query.getTimeLeft();
        if (timeLeftMs <= 0L) {
            return this.errorResult(this.query, ErrorMessage.createTimeout("Timeout while waiting for " + this.getName()));
        }
        Client.ResponseOrError<Client.ProtobufResponse> response = null;
        try {
            response = this.responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (response == null) {
            return this.errorResult(this.query, ErrorMessage.createTimeout("Timeout while waiting for " + this.getName()));
        }
        if (response.error().isPresent()) {
            return this.errorResult(this.query, ErrorMessage.createBackendCommunicationError(response.error().get()));
        }
        if (response.response().isEmpty()) {
            return this.errorResult(this.query, ErrorMessage.createInternalServerError("Neither error nor result available"));
        }
        Client.ProtobufResponse protobufResponse = response.response().get();
        CompressionType compression = CompressionType.valueOf((byte)protobufResponse.compression());
        byte[] payload = this.resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, protobufResponse.uncompressedSize());
        return ProtobufSerialization.deserializeToSearchResult(payload, this.query, this.searcher, this.node.pathIndex(), this.node.key());
    }

    @Override
    protected void release() {
    }

    @Override
    public void receive(Client.ResponseOrError<Client.ProtobufResponse> response) {
        this.responses.add(response);
        this.responseAvailable();
    }

    private String getName() {
        return this.searcher.getName();
    }

    static class RpcContext {
        final Compressor.Compression compressedPayload;

        RpcContext(RpcResourcePool resourcePool, Query query, byte[] payload) {
            this.compressedPayload = resourcePool.compress(query, payload);
        }
    }
}

