/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.vespa.config.server.rpc;

import com.google.inject.Inject;
import com.yahoo.cloud.config.ConfigserverConfig;
import com.yahoo.component.Version;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.config.FileReference;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.HostLivenessTracker;
import com.yahoo.config.provision.TenantName;
import com.yahoo.jrt.Acceptor;
import com.yahoo.jrt.DataValue;
import com.yahoo.jrt.Int32Value;
import com.yahoo.jrt.Int64Value;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Target;
import com.yahoo.jrt.Transport;
import com.yahoo.jrt.Value;
import com.yahoo.log.LogLevel;
import com.yahoo.vespa.config.GetConfigRequest;
import com.yahoo.vespa.config.JRTMethods;
import com.yahoo.vespa.config.protocol.ConfigResponse;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequest;
import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
import com.yahoo.vespa.config.protocol.Trace;
import com.yahoo.vespa.config.server.GetConfigContext;
import com.yahoo.vespa.config.server.ReloadListener;
import com.yahoo.vespa.config.server.RequestHandler;
import com.yahoo.vespa.config.server.SuperModelRequestHandler;
import com.yahoo.vespa.config.server.application.ApplicationSet;
import com.yahoo.vespa.config.server.filedistribution.FileServer;
import com.yahoo.vespa.config.server.host.HostRegistries;
import com.yahoo.vespa.config.server.host.HostRegistry;
import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
import com.yahoo.vespa.config.server.monitoring.MetricUpdaterFactory;
import com.yahoo.vespa.config.server.rpc.DelayedConfigResponses;
import com.yahoo.vespa.config.server.rpc.GetConfigProcessor;
import com.yahoo.vespa.config.server.rpc.RpcRequestHandlerProvider;
import com.yahoo.vespa.config.server.rpc.security.RpcAuthorizer;
import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider;
import com.yahoo.vespa.config.server.tenant.TenantListener;
import com.yahoo.vespa.config.server.tenant.TenantRepository;
import com.yahoo.vespa.filedistribution.FileDownloader;
import com.yahoo.vespa.filedistribution.FileReferenceData;
import com.yahoo.vespa.filedistribution.FileReferenceDownload;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Stream;

public class RpcServer
implements Runnable,
ReloadListener,
TenantListener {
    static final String getConfigMethodName = "getConfigV3";
    private static final int TRACELEVEL = 6;
    static final int TRACELEVEL_DEBUG = 9;
    private static final String THREADPOOL_NAME = "rpcserver worker pool";
    private static final long SHUTDOWN_TIMEOUT = 60L;
    private static final int JRT_RPC_TRANSPORT_THREADS = RpcServer.threadsToUse();
    private final Supervisor supervisor = new Supervisor(new Transport(JRT_RPC_TRANSPORT_THREADS));
    private Spec spec;
    private final boolean useRequestVersion;
    private final boolean hostedVespa;
    private final boolean canReturnEmptySentinelConfig;
    private static final Logger log = Logger.getLogger(RpcServer.class.getName());
    private final DelayedConfigResponses delayedConfigResponses;
    private final HostRegistry<TenantName> hostRegistry;
    private final Map<TenantName, TenantHandlerProvider> tenantProviders = new ConcurrentHashMap<TenantName, TenantHandlerProvider>();
    private final Map<ApplicationId, ApplicationState> applicationStateMap = new ConcurrentHashMap<ApplicationId, ApplicationState>();
    private final SuperModelRequestHandler superModelRequestHandler;
    private final MetricUpdater metrics;
    private final MetricUpdaterFactory metricUpdaterFactory;
    private final HostLivenessTracker hostLivenessTracker;
    private final FileServer fileServer;
    private final RpcAuthorizer rpcAuthorizer;
    private final ThreadPoolExecutor executorService;
    private final FileDownloader downloader;
    private volatile boolean allTenantsLoaded = false;
    private boolean isRunning = false;

    @Inject
    public RpcServer(ConfigserverConfig config, SuperModelRequestHandler superModelRequestHandler, MetricUpdaterFactory metrics, HostRegistries hostRegistries, HostLivenessTracker hostLivenessTracker, FileServer fileServer, RpcAuthorizer rpcAuthorizer, RpcRequestHandlerProvider handlerProvider) {
        this.superModelRequestHandler = superModelRequestHandler;
        this.metricUpdaterFactory = metrics;
        this.supervisor.setMaxOutputBufferSize(config.maxoutputbuffersize());
        this.metrics = metrics.getOrCreateMetricUpdater(Collections.emptyMap());
        this.hostLivenessTracker = hostLivenessTracker;
        LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(config.maxgetconfigclients());
        int rpcWorkerThreads = config.numRpcThreads() == 0 ? RpcServer.threadsToUse() : config.numRpcThreads();
        this.executorService = new ThreadPoolExecutor(rpcWorkerThreads, rpcWorkerThreads, 0L, TimeUnit.SECONDS, workQueue, ThreadFactoryFactory.getThreadFactory((String)THREADPOOL_NAME));
        this.delayedConfigResponses = new DelayedConfigResponses(this, config.numDelayedResponseThreads());
        this.spec = new Spec(null, config.rpcport());
        this.hostRegistry = hostRegistries.getTenantHostRegistry();
        this.useRequestVersion = config.useVespaVersionInRequest();
        this.hostedVespa = config.hostedVespa();
        this.canReturnEmptySentinelConfig = config.canReturnEmptySentinelConfig();
        this.fileServer = fileServer;
        this.rpcAuthorizer = rpcAuthorizer;
        this.downloader = fileServer.downloader();
        handlerProvider.setInstance(this);
        this.setUpHandlers();
    }

    private static int threadsToUse() {
        return Math.max(8, Runtime.getRuntime().availableProcessors());
    }

    private void getConfigV3(Request req) {
        if (log.isLoggable((Level)LogLevel.SPAM)) {
            log.log((Level)LogLevel.SPAM, getConfigMethodName);
        }
        req.detach();
        this.rpcAuthorizer.authorizeConfigRequest(req).thenRun(() -> this.addToRequestQueue((JRTServerConfigRequest)JRTServerConfigRequestV3.createFromRequest((Request)req)));
    }

    private void ping(Request req) {
        req.returnValues().add((Value)new Int32Value(0));
    }

    private void printStatistics(Request req) {
        req.returnValues().add((Value)new StringValue("Delayed responses queue size: " + this.delayedConfigResponses.size()));
    }

    @Override
    public void run() {
        log.log(LogLevel.INFO, "Rpc server will listen on port " + this.spec.port());
        try {
            Acceptor acceptor = this.supervisor.listen(this.spec);
            this.isRunning = true;
            this.supervisor.transport().join();
            acceptor.shutdown().join();
        }
        catch (ListenFailedException e) {
            this.stop();
            throw new RuntimeException("Could not listen at " + this.spec, e);
        }
    }

    public void stop() {
        this.executorService.shutdown();
        try {
            this.executorService.awaitTermination(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
        this.delayedConfigResponses.stop();
        this.supervisor.transport().shutdown().join();
        this.isRunning = false;
    }

    public boolean isRunning() {
        return this.isRunning;
    }

    private void setUpHandlers() {
        this.getSupervisor().addMethod(JRTMethods.createConfigV3GetConfigMethod(this::getConfigV3));
        this.getSupervisor().addMethod(new Method("ping", "", "i", this::ping).methodDesc("ping").returnDesc(0, "ret code", "return code, 0 is OK"));
        this.getSupervisor().addMethod(new Method("printStatistics", "", "s", this::printStatistics).methodDesc("printStatistics").returnDesc(0, "statistics", "Statistics for server"));
        this.getSupervisor().addMethod(new Method("filedistribution.serveFile", "si", "is", this::serveFile));
        this.getSupervisor().addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", this::setFileReferencesToDownload).methodDesc("set which file references to download").paramDesc(0, "file references", "file reference to download").returnDesc(0, "ret", "0 if success, 1 otherwise"));
    }

    private ApplicationState getState(ApplicationId id) {
        ApplicationState state = this.applicationStateMap.get(id);
        if (state == null) {
            this.applicationStateMap.putIfAbsent(id, new ApplicationState(0L));
            state = this.applicationStateMap.get(id);
        }
        return state;
    }

    boolean hasNewerGeneration(ApplicationId id, long generation) {
        return this.getState(id).getActiveGeneration() > generation;
    }

    @Override
    public void configActivated(ApplicationSet applicationSet) {
        ApplicationId applicationId = applicationSet.getId();
        ApplicationState state = this.getState(applicationId);
        state.setActiveGeneration(applicationSet.getApplicationGeneration());
        this.configReloaded(applicationId);
        this.reloadSuperModel(applicationSet);
    }

    private void reloadSuperModel(ApplicationSet applicationSet) {
        this.superModelRequestHandler.reloadConfig(applicationSet);
        this.configReloaded(ApplicationId.global());
    }

    void configReloaded(ApplicationId applicationId) {
        List<DelayedConfigResponses.DelayedConfigResponse> responses = this.delayedConfigResponses.drainQueue(applicationId);
        String logPre = TenantRepository.logPre(applicationId);
        if (log.isLoggable((Level)LogLevel.DEBUG)) {
            log.log((Level)LogLevel.DEBUG, logPre + "Start of configReload: " + responses.size() + " requests on delayed requests queue");
        }
        int responsesSent = 0;
        ExecutorCompletionService<Boolean> completionService = new ExecutorCompletionService<Boolean>(this.executorService);
        while (!responses.isEmpty()) {
            DelayedConfigResponses.DelayedConfigResponse delayedConfigResponse = responses.remove(0);
            if (delayedConfigResponse.cancel()) {
                if (log.isLoggable((Level)LogLevel.DEBUG)) {
                    this.logRequestDebug(LogLevel.DEBUG, logPre + "Timer cancelled for ", delayedConfigResponse.request);
                }
                if (!this.addToRequestQueue(delayedConfigResponse.request, false, completionService).booleanValue()) continue;
                ++responsesSent;
                continue;
            }
            log.log((Level)LogLevel.DEBUG, logPre + "Timer already cancelled or finished or never scheduled");
        }
        for (int i = 0; i < responsesSent; ++i) {
            try {
                completionService.take();
                continue;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        log.log((Level)LogLevel.DEBUG, logPre + "Finished reloading " + responsesSent + " requests");
    }

    private void logRequestDebug(LogLevel level, String message, JRTServerConfigRequest request) {
        if (log.isLoggable((Level)level)) {
            log.log((Level)level, message + request.getShortDescription());
        }
    }

    @Override
    public void hostsUpdated(TenantName tenant, Collection<String> newHosts) {
        log.log((Level)LogLevel.DEBUG, "Updating hosts in tenant host registry '" + this.hostRegistry + "' with " + newHosts);
        this.hostRegistry.update(tenant, newHosts);
    }

    @Override
    public void verifyHostsAreAvailable(TenantName tenant, Collection<String> newHosts) {
        this.hostRegistry.verifyHosts(tenant, newHosts);
    }

    @Override
    public void applicationRemoved(ApplicationId applicationId) {
        this.superModelRequestHandler.removeApplication(applicationId);
        this.configReloaded(applicationId);
        this.configReloaded(ApplicationId.global());
    }

    public void respond(JRTServerConfigRequest request) {
        if (log.isLoggable((Level)LogLevel.DEBUG)) {
            log.log((Level)LogLevel.DEBUG, "Trace at request return:\n" + request.getRequestTrace().toString());
        }
        request.getRequest().returnRequest();
    }

    Optional<TenantName> resolveTenant(JRTServerConfigRequest request, Trace trace) {
        if ("*".equals(request.getConfigKey().getConfigId())) {
            return Optional.of(ApplicationId.global().tenant());
        }
        String hostname = request.getClientHostName();
        TenantName tenant = this.hostRegistry.getKeyForHost(hostname);
        if (tenant == null) {
            if (GetConfigProcessor.logDebug(trace)) {
                String message = "Did not find tenant for host '" + hostname + "', using " + TenantName.defaultName();
                log.log((Level)LogLevel.DEBUG, message);
                log.log((Level)LogLevel.DEBUG, "hosts in host registry: " + this.hostRegistry.getAllHosts());
                trace.trace(6, message);
            }
            return Optional.empty();
        }
        return Optional.of(tenant);
    }

    public ConfigResponse resolveConfig(JRTServerConfigRequest request, GetConfigContext context, Optional<Version> vespaVersion) {
        context.trace().trace(6, "RpcServer.resolveConfig()");
        return context.requestHandler().resolveConfig(context.applicationId(), (GetConfigRequest)request, vespaVersion);
    }

    private Supervisor getSupervisor() {
        return this.supervisor;
    }

    private void addToRequestQueue(JRTServerConfigRequest request) {
        this.addToRequestQueue(request, false, null);
    }

    public Boolean addToRequestQueue(JRTServerConfigRequest request, boolean forceResponse, CompletionService<Boolean> completionService) {
        request.setDelayedResponse(false);
        try {
            GetConfigProcessor task = new GetConfigProcessor(this, request, forceResponse);
            if (completionService == null) {
                this.executorService.submit(task);
            } else {
                completionService.submit(() -> {
                    task.run();
                    return true;
                });
            }
            this.updateWorkQueueMetrics();
            return true;
        }
        catch (RejectedExecutionException e) {
            request.addErrorResponse(100200, "getConfig request queue size is larger than configured max limit");
            this.respond(request);
            return false;
        }
    }

    private void updateWorkQueueMetrics() {
        int queued = this.executorService.getQueue().size();
        this.metrics.setRpcServerQueueSize(queued);
    }

    GetConfigContext createGetConfigContext(Optional<TenantName> optionalTenant, JRTServerConfigRequest request, Trace trace) {
        if ("*".equals(request.getConfigKey().getConfigId())) {
            return GetConfigContext.create(ApplicationId.global(), this.superModelRequestHandler, trace);
        }
        TenantName tenant = optionalTenant.orElse(TenantName.defaultName());
        Optional<RequestHandler> requestHandler = this.getRequestHandler(tenant);
        if (requestHandler.isEmpty()) {
            String msg = TenantRepository.logPre(tenant) + "Unable to find request handler for tenant. Requested from host '" + request.getClientHostName() + "'";
            this.metrics.incUnknownHostRequests();
            trace.trace(6, msg);
            log.log(LogLevel.WARNING, msg);
            return null;
        }
        RequestHandler handler = requestHandler.get();
        ApplicationId applicationId = handler.resolveApplicationId(request.getClientHostName());
        if (trace.shouldTrace(9)) {
            trace.trace(9, "Host '" + request.getClientHostName() + "' should have config from application '" + applicationId + "'");
        }
        return GetConfigContext.create(applicationId, handler, trace);
    }

    Optional<RequestHandler> getRequestHandler(TenantName tenant) {
        return Optional.ofNullable(this.tenantProviders.get(tenant)).map(TenantHandlerProvider::getRequestHandler);
    }

    void delayResponse(JRTServerConfigRequest request, GetConfigContext context) {
        this.delayedConfigResponses.delayResponse(request, context);
    }

    @Override
    public void onTenantDelete(TenantName tenant) {
        log.log((Level)LogLevel.DEBUG, TenantRepository.logPre(tenant) + "Tenant deleted, removing request handler and cleaning host registry");
        this.tenantProviders.remove(tenant);
        this.hostRegistry.removeHostsForKey(tenant);
    }

    @Override
    public void onTenantsLoaded() {
        this.allTenantsLoaded = true;
        this.superModelRequestHandler.enable();
    }

    @Override
    public void onTenantCreate(TenantName tenant, TenantHandlerProvider tenantHandlerProvider) {
        log.log((Level)LogLevel.DEBUG, TenantRepository.logPre(tenant) + "Tenant created, adding request handler");
        this.tenantProviders.put(tenant, tenantHandlerProvider);
    }

    public boolean allTenantsLoaded() {
        return this.allTenantsLoaded;
    }

    public boolean isHostedVespa() {
        return this.hostedVespa;
    }

    public boolean canReturnEmptySentinelConfig() {
        return this.canReturnEmptySentinelConfig;
    }

    MetricUpdaterFactory metricUpdaterFactory() {
        return this.metricUpdaterFactory;
    }

    boolean useRequestVersion() {
        return this.useRequestVersion;
    }

    private void serveFile(Request request) {
        request.detach();
        this.rpcAuthorizer.authorizeFileRequest(request).thenRun(() -> {
            ChunkedFileReceiver receiver = new ChunkedFileReceiver(request.target());
            this.fileServer.serveFile(request.parameters().get(0).asString(), request.parameters().get(1).asInt32() == 0, request, receiver);
        });
    }

    private void setFileReferencesToDownload(Request req) {
        req.detach();
        this.rpcAuthorizer.authorizeFileRequest(req).thenRun(() -> {
            String[] fileReferenceStrings = req.parameters().get(0).asStringArray();
            Stream.of(fileReferenceStrings).map(FileReference::new).forEach(fileReference -> this.downloader.downloadIfNeeded(new FileReferenceDownload(fileReference, false)));
            req.returnValues().add((Value)new Int32Value(0));
        });
    }

    HostLivenessTracker hostLivenessTracker() {
        return this.hostLivenessTracker;
    }

    class ChunkedFileReceiver
    implements FileServer.Receiver {
        Target target;

        ChunkedFileReceiver(Target target) {
            this.target = target;
        }

        public String toString() {
            return this.target.toString();
        }

        @Override
        public void receive(FileReferenceData fileData, FileServer.ReplayStatus status) {
            int session = this.sendMeta(fileData);
            this.sendParts(session, fileData);
            this.sendEof(session, fileData, status);
        }

        private void sendParts(int session, FileReferenceData fileData) {
            ByteBuffer bb = ByteBuffer.allocate(0x100000);
            int partId = 0;
            int read = fileData.nextContent(bb);
            while (read >= 0) {
                byte[] buf = bb.array();
                if (buf.length != bb.position()) {
                    buf = new byte[bb.position()];
                    bb.flip();
                    bb.get(buf);
                }
                this.sendPart(session, fileData.fileReference(), partId, buf);
                bb.clear();
                ++partId;
                read = fileData.nextContent(bb);
            }
        }

        private int sendMeta(FileReferenceData fileData) {
            Request request = new Request("filedistribution.receiveFileMeta");
            request.parameters().add((Value)new StringValue(fileData.fileReference().value()));
            request.parameters().add((Value)new StringValue(fileData.filename()));
            request.parameters().add((Value)new StringValue(fileData.type().name()));
            request.parameters().add((Value)new Int64Value(fileData.size()));
            this.invokeRpcIfValidConnection(request);
            if (request.isError()) {
                log.warning("Failed delivering meta for reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + this.target.toString() + " with error: '" + request.errorMessage() + "'.");
                return 1;
            }
            if (request.returnValues().get(0).asInt32() != 0) {
                throw new IllegalArgumentException("Unknown error from target '" + this.target.toString() + "' during rpc call " + request.methodName());
            }
            return request.returnValues().get(1).asInt32();
        }

        private void sendPart(int session, FileReference ref, int partId, byte[] buf) {
            Request request = new Request("filedistribution.receiveFilePart");
            request.parameters().add((Value)new StringValue(ref.value()));
            request.parameters().add((Value)new Int32Value(session));
            request.parameters().add((Value)new Int32Value(partId));
            request.parameters().add((Value)new DataValue(buf));
            this.invokeRpcIfValidConnection(request);
            if (request.isError()) {
                throw new IllegalArgumentException("Failed delivering reference '" + ref.value() + "' to " + this.target.toString() + " with error: '" + request.errorMessage() + "'.");
            }
            if (request.returnValues().get(0).asInt32() != 0) {
                throw new IllegalArgumentException("Unknown error from target '" + this.target.toString() + "' during rpc call " + request.methodName());
            }
        }

        private void sendEof(int session, FileReferenceData fileData, FileServer.ReplayStatus status) {
            Request request = new Request("filedistribution.receiveFileEof");
            request.parameters().add((Value)new StringValue(fileData.fileReference().value()));
            request.parameters().add((Value)new Int32Value(session));
            request.parameters().add((Value)new Int64Value(fileData.xxhash()));
            request.parameters().add((Value)new Int32Value(status.getCode()));
            request.parameters().add((Value)new StringValue(status.getDescription()));
            this.invokeRpcIfValidConnection(request);
            if (request.isError()) {
                throw new IllegalArgumentException("Failed delivering reference '" + fileData.fileReference().value() + "' with file '" + fileData.filename() + "' to " + this.target.toString() + " with error: '" + request.errorMessage() + "'.");
            }
            if (request.returnValues().get(0).asInt32() != 0) {
                throw new IllegalArgumentException("Unknown error from target '" + this.target.toString() + "' during rpc call " + request.methodName());
            }
        }

        private void invokeRpcIfValidConnection(Request request) {
            if (!this.target.isValid()) {
                throw new RuntimeException("Connection to " + this.target + " is invalid", this.target.getConnectionLostReason());
            }
            this.target.invokeSync(request, 600.0);
        }
    }

    static class ApplicationState {
        private final AtomicLong activeGeneration = new AtomicLong(0L);

        ApplicationState(long generation) {
            this.activeGeneration.set(generation);
        }

        long getActiveGeneration() {
            return this.activeGeneration.get();
        }

        void setActiveGeneration(long generation) {
            this.activeGeneration.set(generation);
        }
    }
}

