/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.config.proxy;

import com.yahoo.jrt.Acceptor;
import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.StringArray;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.TargetWatcher;
import com.yahoo.jrt.Value;
import com.yahoo.vespa.config.JRTMethods;
import com.yahoo.vespa.config.RawConfig;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
import com.yahoo.vespa.config.proxy.DelayedResponse;
import com.yahoo.vespa.config.proxy.MemoryCache;
import com.yahoo.vespa.config.proxy.ProxyServer;
import com.yahoo.vespa.config.proxy.ResponseHandler;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ConfigProxyRpcServer
implements Runnable,
TargetWatcher {
    private static final Logger log = Logger.getLogger(ConfigProxyRpcServer.class.getName());
    static final int TRACELEVEL = 6;
    private final Spec spec;
    private final Supervisor supervisor;
    private final ProxyServer proxyServer;
    private final ExecutorService rpcExecutor = Executors.newFixedThreadPool(8);

    ConfigProxyRpcServer(ProxyServer proxyServer, Supervisor supervisor, Spec spec) {
        this.proxyServer = proxyServer;
        this.spec = spec;
        this.supervisor = supervisor;
        this.declareConfigMethods();
    }

    @Override
    public void run() {
        try {
            Acceptor acceptor = this.supervisor.listen(this.spec);
            log.log(Level.FINE, () -> "Ready for requests on " + this.spec);
            this.supervisor.transport().join();
            acceptor.shutdown().join();
        }
        catch (ListenFailedException e) {
            this.proxyServer.stop();
            throw new RuntimeException("Could not listen on " + this.spec, e);
        }
    }

    void shutdown() {
        try {
            this.rpcExecutor.shutdownNow();
            this.rpcExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        this.supervisor.transport().shutdown().join();
    }

    Spec getSpec() {
        return this.spec;
    }

    private void declareConfigMethods() {
        this.supervisor.addMethod(JRTMethods.createConfigV3GetConfigMethod(this::getConfigV3));
        this.supervisor.addMethod(new Method("ping", "", "i", this::ping).methodDesc("ping").returnDesc(0, "ret code", "return code, 0 is OK"));
        this.supervisor.addMethod(new Method("listCachedConfig", "", "S", this::listCachedConfig).methodDesc("list cached configs)").returnDesc(0, "data", "string array of configs"));
        this.supervisor.addMethod(new Method("listCachedConfigFull", "", "S", this::listCachedConfigFull).methodDesc("list cached configs with cache content)").returnDesc(0, "data", "string array of configs"));
        this.supervisor.addMethod(new Method("listSourceConnections", "", "S", this::listSourceConnections).methodDesc("list config source connections)").returnDesc(0, "data", "string array of source connections"));
        this.supervisor.addMethod(new Method("invalidateCache", "", "S", this::invalidateCache).methodDesc("list config source connections)").returnDesc(0, "data", "0 if success, 1 otherwise"));
        this.supervisor.addMethod(new Method("updateSources", "s", "s", this::updateSources).methodDesc("update list of config sources").returnDesc(0, "ret", "list of updated config sources"));
        this.supervisor.addMethod(new Method("setMode", "s", "S", this::setMode).methodDesc("Set config proxy mode { default | memorycache }").returnDesc(0, "ret", "0 if success, 1 otherwise as first element, description as second element"));
        this.supervisor.addMethod(new Method("getMode", "", "s", this::getMode).methodDesc("What serving mode the config proxy is in (default, memorycache)").returnDesc(0, "ret", "mode as a string"));
        this.supervisor.addMethod(new Method("dumpCache", "s", "s", this::dumpCache).methodDesc("Dump cache to disk").paramDesc(0, "path", "path to write cache contents to").returnDesc(0, "ret", "Empty string or error message"));
    }

    private void getConfigV3(Request req) {
        this.dispatchRpcRequest(req, () -> {
            JRTServerConfigRequestV3 request = JRTServerConfigRequestV3.createFromRequest((Request)req);
            req.target().addWatcher((TargetWatcher)this);
            this.getConfigImpl((JRTServerConfigRequest)request);
        });
    }

    private void ping(Request req) {
        this.dispatchRpcRequest(req, () -> {
            req.returnValues().add((Value)new Int32Value(0));
            req.returnRequest();
        });
    }

    private void listCachedConfig(Request req) {
        this.dispatchRpcRequest(req, () -> this.listCachedConfig(req, false));
    }

    private void listCachedConfigFull(Request req) {
        this.dispatchRpcRequest(req, () -> this.listCachedConfig(req, true));
    }

    private void listSourceConnections(Request req) {
        this.dispatchRpcRequest(req, () -> {
            String[] ret = new String[]{"Current source: " + this.proxyServer.getActiveSourceConnection(), "All sources:\n" + this.printSourceConnections()};
            req.returnValues().add((Value)new StringArray(ret));
            req.returnRequest();
        });
    }

    private void updateSources(Request req) {
        this.dispatchRpcRequest(req, () -> {
            String ret;
            String sources = req.parameters().get(0).asString();
            System.out.println(this.proxyServer.getMode());
            if (this.proxyServer.getMode().requiresConfigSource()) {
                this.proxyServer.updateSourceConnections(Arrays.asList(sources.split(",")));
                ret = "Updated config sources to: " + sources;
            } else {
                ret = "Cannot update sources when in '" + this.proxyServer.getMode().name() + "' mode";
            }
            req.returnValues().add((Value)new StringValue(ret));
            req.returnRequest();
        });
    }

    private void invalidateCache(Request req) {
        this.dispatchRpcRequest(req, () -> {
            this.proxyServer.memoryCache().clear();
            String[] s = new String[]{"0", "success"};
            req.returnValues().add((Value)new StringArray(s));
            req.returnRequest();
        });
    }

    private void setMode(Request req) {
        this.dispatchRpcRequest(req, () -> {
            String suppliedMode = req.parameters().get(0).asString();
            String[] s = new String[2];
            try {
                this.proxyServer.setMode(suppliedMode);
                s[0] = "0";
                s[1] = "success";
            }
            catch (Exception e) {
                s[0] = "1";
                s[1] = e.getMessage();
            }
            req.returnValues().add((Value)new StringArray(s));
            req.returnRequest();
        });
    }

    private void getMode(Request req) {
        this.dispatchRpcRequest(req, () -> {
            req.returnValues().add((Value)new StringValue(this.proxyServer.getMode().name()));
            req.returnRequest();
        });
    }

    private void dumpCache(Request req) {
        this.dispatchRpcRequest(req, () -> {
            MemoryCache memoryCache = this.proxyServer.memoryCache();
            req.returnValues().add((Value)new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache)));
            req.returnRequest();
        });
    }

    private void dispatchRpcRequest(Request request, Runnable handler) {
        request.detach();
        log.log(Level.FINEST, () -> String.format("Dispatching RPC request %s", this.requestLogId(request)));
        this.rpcExecutor.execute(() -> {
            try {
                log.log(Level.FINEST, () -> String.format("Executing RPC request %s.", this.requestLogId(request)));
                handler.run();
            }
            catch (Exception e) {
                log.log(Level.WARNING, String.format("Exception thrown during execution of RPC request %s: %s", this.requestLogId(request), e.getMessage()), e);
            }
        });
    }

    private String requestLogId(Request request) {
        return String.format("%s/%08X", request.methodName(), request.hashCode());
    }

    private void getConfigImpl(JRTServerConfigRequest request) {
        ResponseHandler responseHandler = new ResponseHandler();
        request.getRequestTrace().trace(6, "Config proxy getConfig()");
        log.log(Level.FINE, () -> "getConfig: " + request);
        if (!request.validateParameters()) {
            log.log(Level.WARNING, "Invalid parameters for request " + request + ": " + request.errorCode() + " : " + request.errorMessage());
            responseHandler.returnErrorResponse(request, request.errorCode(), "Invalid parameters for request " + request + ": " + request.errorMessage());
            return;
        }
        try {
            Optional<RawConfig> config = this.proxyServer.resolveConfig(request);
            if (config.isEmpty()) {
                log.log(Level.FINEST, () -> "No config received yet for " + request + ", not sending response");
            } else if (ProxyServer.configOrGenerationHasChanged(config.get(), request)) {
                responseHandler.returnOkResponse(request, config.get());
            } else {
                log.log(Level.FINEST, () -> "No new config for " + request + ", not sending response");
            }
        }
        catch (Exception e) {
            log.log(Level.WARNING, "Resolving config " + request + " failed", e);
            responseHandler.returnErrorResponse(request, 100200, e.getMessage());
        }
    }

    private String printSourceConnections() {
        StringBuilder sb = new StringBuilder();
        for (String s : this.proxyServer.getSourceConnections()) {
            sb.append(s).append("\n");
        }
        return sb.toString();
    }

    private void listCachedConfig(Request req, boolean full) {
        MemoryCache cache = this.proxyServer.memoryCache();
        Object[] ret = new String[cache.size()];
        int i = 0;
        for (RawConfig config : cache.values()) {
            StringBuilder sb = new StringBuilder();
            sb.append(config.getNamespace());
            sb.append(".");
            sb.append(config.getName());
            sb.append(",");
            sb.append(config.getConfigId());
            sb.append(",");
            sb.append(config.getGeneration());
            sb.append(",");
            sb.append(config.getPayloadChecksums());
            if (full) {
                sb.append(",");
                sb.append(config.getPayload());
            }
            ret[i] = sb.toString();
            ++i;
        }
        Arrays.sort(ret);
        req.returnValues().add((Value)new StringArray((String[])ret));
        req.returnRequest();
    }

    public void notifyTargetInvalid(Target target) {
        log.log(Level.FINE, () -> "Target invalid " + target);
        Iterator<DelayedResponse> it = this.proxyServer.delayedResponses().responses().iterator();
        while (it.hasNext()) {
            DelayedResponse delayed = it.next();
            JRTServerConfigRequest request = delayed.getRequest();
            if (!request.getRequest().target().equals(target)) continue;
            log.log(Level.FINE, () -> "Removing " + request.getShortDescription());
            it.remove();
        }
    }
}

