/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.messagebus.network.rpc;

import com.yahoo.component.Version;
import com.yahoo.component.Vtag;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.jrt.Acceptor;
import com.yahoo.jrt.ListenFailedException;
import com.yahoo.jrt.Method;
import com.yahoo.jrt.MethodHandler;
import com.yahoo.jrt.Request;
import com.yahoo.jrt.Spec;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.Supervisor;
import com.yahoo.jrt.Task;
import com.yahoo.jrt.Transport;
import com.yahoo.jrt.Value;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.network.Identity;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.NetworkOwner;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.network.rpc.RPCSendAdapter;
import com.yahoo.messagebus.network.rpc.RPCSendV1;
import com.yahoo.messagebus.network.rpc.RPCSendV2;
import com.yahoo.messagebus.network.rpc.RPCServiceAddress;
import com.yahoo.messagebus.network.rpc.RPCServicePool;
import com.yahoo.messagebus.network.rpc.RPCTarget;
import com.yahoo.messagebus.network.rpc.RPCTargetPool;
import com.yahoo.messagebus.network.rpc.SlobrokConfigSubscriber;
import com.yahoo.messagebus.routing.Hop;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingNode;
import com.yahoo.text.Utf8Array;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class RPCNetwork
implements Network,
MethodHandler {
    private static final Logger log = Logger.getLogger(RPCNetwork.class.getName());
    private final AtomicBoolean destroyed = new AtomicBoolean(false);
    private final Identity identity;
    private final Supervisor orb;
    private final RPCTargetPool targetPool;
    private final RPCServicePool servicePool;
    private final Acceptor listener;
    private final Mirror mirror;
    private final Register register;
    private final TreeMap<Version, RPCSendAdapter> sendAdapters = new TreeMap();
    private NetworkOwner owner;
    private final SlobrokConfigSubscriber slobroksConfig;
    private final LinkedHashMap<String, Route> lruRouteMap = new LinkedHashMap(10000, 0.5f, true);
    private final ExecutorService executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(false), ThreadFactoryFactory.getDaemonThreadFactory((String)"mbus.net"), new ThreadPoolExecutor.CallerRunsPolicy());

    public RPCNetwork(RPCNetworkParams params, SlobrokConfigSubscriber slobrokConfig) {
        this.slobroksConfig = slobrokConfig;
        this.identity = params.getIdentity();
        this.orb = new Supervisor(new Transport());
        this.orb.setMaxInputBufferSize(params.getMaxInputBufferSize());
        this.orb.setMaxOutputBufferSize(params.getMaxOutputBufferSize());
        this.targetPool = new RPCTargetPool(params.getConnectionExpireSecs());
        this.servicePool = new RPCServicePool(this, 4096);
        Method method = new Method("mbus.getVersion", "", "s", (MethodHandler)this);
        method.methodDesc("Retrieves the message bus version.");
        method.returnDesc(0, "version", "The message bus version.");
        this.orb.addMethod(method);
        try {
            this.listener = this.orb.listen(new Spec(params.getListenPort()));
        }
        catch (ListenFailedException e) {
            this.orb.transport().shutdown().join();
            throw new RuntimeException(e);
        }
        TargetPoolTask task = new TargetPoolTask(this.targetPool, this.orb);
        task.jrtTask.scheduleNow();
        this.register = new Register(this.orb, slobrokConfig.getSlobroks(), this.identity.getHostname(), this.listener.port());
        this.mirror = new Mirror(this.orb, slobrokConfig.getSlobroks());
    }

    public RPCNetwork(RPCNetworkParams params) {
        this(params, params.getSlobroksConfig() != null ? new SlobrokConfigSubscriber(params.getSlobroksConfig()) : new SlobrokConfigSubscriber(params.getSlobrokConfigId()));
    }

    protected void flushTargetPool() {
        this.targetPool.flushTargets(true);
    }

    final Route getRoute(String routeString) {
        Route route = this.lruRouteMap.get(routeString);
        if (route == null) {
            route = Route.parse(routeString);
            this.lruRouteMap.put(routeString, route);
        }
        return new Route(route);
    }

    @Override
    public boolean waitUntilReady(double seconds) {
        int i = 0;
        while ((double)i < seconds * 100.0) {
            if (this.mirror.ready()) {
                return true;
            }
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            ++i;
        }
        return false;
    }

    @Override
    public boolean allocServiceAddress(RoutingNode recipient) {
        Hop hop = recipient.getRoute().getHop(0);
        String service = hop.getServiceName();
        Error error = this.resolveServiceAddress(recipient, service);
        if (error == null) {
            return true;
        }
        recipient.setError(error);
        return false;
    }

    @Override
    public void freeServiceAddress(RoutingNode recipient) {
        RPCTarget target = ((RPCServiceAddress)recipient.getServiceAddress()).getTarget();
        if (target != null) {
            target.subRef();
        }
        recipient.setServiceAddress(null);
    }

    @Override
    public void attach(NetworkOwner owner) {
        if (this.owner != null) {
            throw new IllegalStateException("Network is already attached to another owner.");
        }
        this.owner = owner;
        RPCSendV1 adapter1 = new RPCSendV1();
        RPCSendV2 adapter2 = new RPCSendV2();
        this.addSendAdapter(new Version(5), adapter1);
        this.addSendAdapter(new Version(6, 149), adapter2);
    }

    @Override
    public void registerSession(String session) {
        this.register.registerName(this.identity.getServicePrefix() + "/" + session);
    }

    @Override
    public void unregisterSession(String session) {
        this.register.unregisterName(this.identity.getServicePrefix() + "/" + session);
    }

    @Override
    public void sync() {
        SyncTask sh = new SyncTask();
        this.orb.transport().perform((Runnable)sh);
        sh.await();
    }

    @Override
    public void shutdown() {
        this.destroy();
    }

    @Override
    public String getConnectionSpec() {
        return "tcp/" + this.identity.getHostname() + ":" + this.listener.port();
    }

    @Override
    public IMirror getMirror() {
        return this.mirror;
    }

    public void invoke(Request request) {
        request.returnValues().add((Value)new StringValue(this.getVersion().toString()));
    }

    @Override
    public void send(Message msg, List<RoutingNode> recipients) {
        SendContext ctx = new SendContext(this, msg, recipients);
        double timeout = (double)ctx.msg.getTimeRemainingNow() / 1000.0;
        for (RoutingNode recipient : ctx.recipients) {
            RPCServiceAddress address = (RPCServiceAddress)recipient.getServiceAddress();
            address.getTarget().resolveVersion(timeout, ctx);
        }
    }

    private static String buildRecipientListString(SendContext ctx) {
        return ctx.recipients.stream().map(r -> {
            if (!(r.getServiceAddress() instanceof RPCServiceAddress)) {
                return "<non-RPC service address>";
            }
            RPCServiceAddress addr = (RPCServiceAddress)r.getServiceAddress();
            return String.format("%s at %s", addr.getServiceName(), addr.getConnectionSpec());
        }).collect(Collectors.joining(", "));
    }

    private void send(SendContext ctx) {
        if (this.destroyed.get()) {
            this.replyError(ctx, 200012, "Network layer has performed shutdown.");
        } else if (ctx.hasError) {
            this.replyError(ctx, 100007, String.format("An error occurred while resolving version of recipient(s) [%s] from host '%s'.", RPCNetwork.buildRecipientListString(ctx), this.identity.getHostname()));
        } else {
            this.executor.execute(new SendTask(this.owner.getProtocol((Utf8Array)ctx.msg.getProtocol()), ctx));
        }
    }

    public boolean destroy() {
        if (!this.destroyed.getAndSet(true)) {
            if (this.slobroksConfig != null) {
                this.slobroksConfig.shutdown();
            }
            this.register.shutdown();
            this.mirror.shutdown();
            this.listener.shutdown().join();
            this.orb.transport().shutdown().join();
            this.targetPool.flushTargets(true);
            this.executor.shutdown();
            return true;
        }
        return false;
    }

    protected Version getVersion() {
        return Vtag.currentVersion;
    }

    public Error resolveServiceAddress(RoutingNode recipient, String serviceName) {
        RPCServiceAddress ret = this.servicePool.resolve(serviceName);
        if (ret == null) {
            return new Error(100002, String.format("The address of service '%s' could not be resolved. It is not currently registered with the Vespa name server. The service must be having problems, or the routing configuration is wrong. Address resolution attempted from host '%s'", serviceName, this.identity.getHostname()));
        }
        RPCTarget target = this.targetPool.getTarget(this.orb, ret);
        if (target == null) {
            return new Error(100003, String.format("Failed to connect to service '%s' from host '%s'.", serviceName, this.identity.getHostname()));
        }
        ret.setTarget(target);
        recipient.setServiceAddress(ret);
        return null;
    }

    private void addSendAdapter(Version version, RPCSendAdapter adapter) {
        adapter.attach(this);
        this.sendAdapters.put(version, adapter);
    }

    public RPCSendAdapter getSendAdapter(Version version) {
        Map.Entry<Version, RPCSendAdapter> lower = this.sendAdapters.floorEntry(version);
        return lower != null ? lower.getValue() : null;
    }

    private void replyError(SendContext ctx, int errCode, String errMsg) {
        for (RoutingNode recipient : ctx.recipients) {
            EmptyReply reply = new EmptyReply();
            reply.getTrace().setLevel(ctx.traceLevel);
            reply.addError(new Error(errCode, errMsg));
            this.owner.deliverReply(reply, recipient);
        }
    }

    NetworkOwner getOwner() {
        return this.owner;
    }

    public Identity getIdentity() {
        return this.identity;
    }

    public int getPort() {
        return this.listener.port();
    }

    Supervisor getSupervisor() {
        return this.orb;
    }

    ExecutorService getExecutor() {
        return this.executor;
    }

    private static class TargetPoolTask
    implements Runnable {
        final RPCTargetPool pool;
        final Task jrtTask;

        TargetPoolTask(RPCTargetPool pool, Supervisor orb) {
            this.pool = pool;
            this.jrtTask = orb.transport().createTask((Runnable)this);
            this.jrtTask.schedule(1.0);
        }

        @Override
        public void run() {
            this.pool.flushTargets(false);
            this.jrtTask.schedule(1.0);
        }
    }

    private static class SendContext
    implements RPCTarget.VersionHandler {
        final RPCNetwork net;
        final Message msg;
        final int traceLevel;
        final List<RoutingNode> recipients = new LinkedList<RoutingNode>();
        boolean hasError = false;
        int pending;
        Version version;

        SendContext(RPCNetwork net, Message msg, List<RoutingNode> recipients) {
            this.net = net;
            this.msg = msg;
            this.traceLevel = this.msg.getTrace().getLevel();
            this.recipients.addAll(recipients);
            this.pending = this.recipients.size();
            this.version = this.net.getVersion();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleVersion(Version version) {
            boolean shouldSend = false;
            SendContext sendContext = this;
            synchronized (sendContext) {
                if (version == null) {
                    this.hasError = true;
                } else if (version.compareTo(this.version) < 0) {
                    this.version = version;
                }
                if (--this.pending == 0) {
                    shouldSend = true;
                }
            }
            if (shouldSend) {
                this.net.send(this);
            }
        }
    }

    private static class SyncTask
    implements Runnable {
        final CountDownLatch latch = new CountDownLatch(1);

        private SyncTask() {
        }

        @Override
        public void run() {
            this.latch.countDown();
        }

        public void await() {
            try {
                this.latch.await();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    private class SendTask
    implements Runnable {
        final Protocol protocol;
        final SendContext ctx;

        SendTask(Protocol protocol, SendContext ctx) {
            this.protocol = protocol;
            this.ctx = ctx;
        }

        @Override
        public void run() {
            byte[] payload;
            long timeRemaining = this.ctx.msg.getTimeRemainingNow();
            if (timeRemaining <= 0L) {
                RPCNetwork.this.replyError(this.ctx, 200009, "Aborting transmission because zero time remains.");
                return;
            }
            try {
                payload = this.protocol.encode(this.ctx.version, this.ctx.msg);
            }
            catch (Exception e) {
                StringWriter out = new StringWriter();
                e.printStackTrace(new PrintWriter(out));
                RPCNetwork.this.replyError(this.ctx, 200005, out.toString());
                return;
            }
            if (payload == null || payload.length == 0) {
                RPCNetwork.this.replyError(this.ctx, 200005, "Protocol '" + this.ctx.msg.getProtocol() + "' failed to encode message.");
                return;
            }
            RPCSendAdapter adapter = RPCNetwork.this.getSendAdapter(this.ctx.version);
            if (adapter == null) {
                RPCNetwork.this.replyError(this.ctx, 200010, "Can not send to version '" + this.ctx.version + "' recipient.");
                return;
            }
            for (RoutingNode recipient : this.ctx.recipients) {
                adapter.send(recipient, this.ctx.version, payload, timeRemaining);
            }
        }
    }
}

