/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.search.dispatch.rpc;

import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.rpc.Client;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.yolean.Exceptions;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class RpcPing
implements Callable<Pong> {
    private static final String RPC_METHOD = "vespa.searchprotocol.ping";
    private static final CompressionType PING_COMPRESSION = CompressionType.NONE;
    private final Node node;
    private final RpcResourcePool resourcePool;
    private final ClusterMonitor<Node> clusterMonitor;

    public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool) {
        this.node = node;
        this.resourcePool = rpcResourcePool;
        this.clusterMonitor = clusterMonitor;
    }

    @Override
    public Pong call() throws Exception {
        try {
            LinkedBlockingQueue<Client.ResponseOrError<Client.ProtobufResponse>> queue = new LinkedBlockingQueue<Client.ResponseOrError<Client.ProtobufResponse>>(1);
            this.sendPing(queue);
            Client.ResponseOrError<Client.ProtobufResponse> responseOrError = queue.poll(this.clusterMonitor.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS);
            if (responseOrError == null) {
                return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Timed out waiting for pong from " + this.node));
            }
            if (responseOrError.error().isPresent()) {
                return new Pong(ErrorMessage.createBackendCommunicationError(responseOrError.error().get()));
            }
            return this.decodeReply(responseOrError.response().get());
        }
        catch (RuntimeException e) {
            return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + this.node + ": " + Exceptions.toMessageString((Throwable)e)));
        }
    }

    private void sendPing(LinkedBlockingQueue<Client.ResponseOrError<Client.ProtobufResponse>> queue) {
        Client.NodeConnection connection = this.resourcePool.getConnection(this.node.key());
        byte[] ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray();
        double timeoutSeconds = (double)this.clusterMonitor.getConfiguration().getRequestTimeout() / 1000.0;
        Compressor.Compression compressionResult = this.resourcePool.compressor().compress(PING_COMPRESSION, ping);
        connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), rsp -> queue.add(rsp), timeoutSeconds);
    }

    private Pong decodeReply(Client.ProtobufResponse response) throws InvalidProtocolBufferException {
        CompressionType compression = CompressionType.valueOf((byte)response.compression());
        byte[] responseBytes = this.resourcePool.compressor().decompress(response.compressedPayload(), compression, response.uncompressedSize());
        SearchProtocol.MonitorReply reply = SearchProtocol.MonitorReply.parseFrom((byte[])responseBytes);
        if (reply.getDistributionKey() != this.node.key()) {
            return new Pong(ErrorMessage.createBackendCommunicationError("Expected pong from node id " + this.node.key() + ", response is from id " + reply.getDistributionKey()));
        }
        if (!reply.getOnline()) {
            return new Pong(ErrorMessage.createBackendCommunicationError("Node id " + this.node.key() + " reports being offline"));
        }
        return new Pong(reply.getActiveDocs());
    }
}

