/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.messagebus;

import com.yahoo.concurrent.SystemTimer;
import com.yahoo.jrt.slobrok.api.IMirror;
import com.yahoo.jrt.slobrok.api.Mirror;
import com.yahoo.messagebus.ConfigHandler;
import com.yahoo.messagebus.DestinationSession;
import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.IntermediateSession;
import com.yahoo.messagebus.IntermediateSessionParams;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBusParams;
import com.yahoo.messagebus.MessageHandler;
import com.yahoo.messagebus.Messenger;
import com.yahoo.messagebus.Protocol;
import com.yahoo.messagebus.ProtocolRepository;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.SendProxy;
import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.network.Network;
import com.yahoo.messagebus.network.NetworkMultiplexer;
import com.yahoo.messagebus.network.NetworkOwner;
import com.yahoo.messagebus.routing.Resender;
import com.yahoo.messagebus.routing.RetryPolicy;
import com.yahoo.messagebus.routing.RoutingPolicy;
import com.yahoo.messagebus.routing.RoutingSpec;
import com.yahoo.messagebus.routing.RoutingTable;
import com.yahoo.messagebus.routing.RoutingTableSpec;
import com.yahoo.protect.Process;
import com.yahoo.text.Utf8Array;
import com.yahoo.text.Utf8String;
import com.yahoo.vespa.defaults.Defaults;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MessageBus
implements ConfigHandler,
NetworkOwner,
MessageHandler,
ReplyHandler {
    private static final Logger log = Logger.getLogger(MessageBus.class.getName());
    private final AtomicBoolean destroyed = new AtomicBoolean(false);
    private final ProtocolRepository protocolRepository = new ProtocolRepository();
    private final AtomicReference<Map<String, RoutingTable>> tablesRef = new AtomicReference<Object>(null);
    private final Map<String, MessageHandler> sessions = new ConcurrentHashMap<String, MessageHandler>();
    private final NetworkMultiplexer net;
    private final Messenger msn;
    private final Resender resender;
    private int maxPendingCount;
    private int maxPendingSize;
    private int pendingCount = 0;
    private int pendingSize = 0;
    private final Thread careTaker = new Thread(this::sendBlockedMessages);
    private final Map<SendBlockedMessages, Long> blockedSenders = new ConcurrentHashMap<SendBlockedMessages, Long>();

    public void register(SendBlockedMessages sender) {
        this.blockedSenders.put(sender, SystemTimer.INSTANCE.milliTime());
    }

    private void sendBlockedMessages() {
        long timeout = SystemTimer.adjustTimeoutByDetectedHz((Duration)Duration.ofMillis(10L)).toMillis();
        while (!this.destroyed.get()) {
            for (SendBlockedMessages sender : this.blockedSenders.keySet()) {
                if (sender.trySend()) continue;
                this.blockedSenders.remove(sender);
            }
            try {
                Thread.sleep(timeout);
            }
            catch (InterruptedException e) {
                return;
            }
        }
    }

    public MessageBus(Network net, List<Protocol> protocols) {
        this(net, new MessageBusParams().addProtocols(protocols));
    }

    public MessageBus(Network net, MessageBusParams params) {
        this(NetworkMultiplexer.dedicated(net), params);
    }

    public MessageBus(NetworkMultiplexer net, MessageBusParams params) {
        this.maxPendingCount = params.getMaxPendingCount();
        this.maxPendingSize = params.getMaxPendingSize();
        int len = params.getNumProtocols();
        for (int i = 0; i < len; ++i) {
            this.protocolRepository.putProtocol(params.getProtocol(i));
        }
        this.net = net;
        net.attach(this);
        if (!net.net().waitUntilReady(180.0)) {
            try {
                IMirror tmp = net.net().getMirror();
                Mirror mirror = (Mirror)tmp;
                if (mirror.getIterations() < 2L) {
                    Process.dumpThreads();
                    String fn = "var/crash/java_pid." + ProcessHandle.current().pid() + ".hprof";
                    Process.dumpHeap((String)Defaults.getDefaults().underVespaHome(fn), (boolean)true);
                }
            }
            catch (Exception tmp) {
                // empty catch block
            }
            throw new IllegalStateException("Network failed to become ready in time.");
        }
        this.msn = new Messenger();
        RetryPolicy retryPolicy = params.getRetryPolicy();
        if (retryPolicy != null) {
            this.resender = new Resender(retryPolicy);
            this.msn.addRecurrentTask(new ResenderTask(this.resender));
        } else {
            this.resender = null;
        }
        this.careTaker.setDaemon(true);
        this.careTaker.start();
        this.msn.start();
    }

    public boolean destroy() {
        if (!this.destroyed.getAndSet(true)) {
            try {
                this.careTaker.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.protocolRepository.clearPolicyCache();
            this.net.detach(this);
            this.msn.destroy();
            if (this.resender != null) {
                this.resender.destroy();
            }
            return true;
        }
        return false;
    }

    public void sync() {
        this.msn.sync();
        this.net.net().sync();
    }

    public SourceSession createSourceSession(ReplyHandler handler) {
        return this.createSourceSession(new SourceSessionParams().setReplyHandler(handler));
    }

    public SourceSession createSourceSession(ReplyHandler handler, SourceSessionParams params) {
        return this.createSourceSession(new SourceSessionParams(params).setReplyHandler(handler));
    }

    public SourceSession createSourceSession(SourceSessionParams params) {
        if (this.destroyed.get()) {
            throw new IllegalStateException("Object is destroyed.");
        }
        return new SourceSession(this, params);
    }

    public IntermediateSession createIntermediateSession(String name, boolean broadcastName, MessageHandler msgHandler, ReplyHandler replyHandler) {
        return this.createIntermediateSession(new IntermediateSessionParams().setName(name).setBroadcastName(broadcastName).setMessageHandler(msgHandler).setReplyHandler(replyHandler));
    }

    public synchronized IntermediateSession createIntermediateSession(IntermediateSessionParams params) {
        IntermediateSession session = this.createDetachedIntermediateSession(params);
        this.connect(params.getName(), params.getBroadcastName());
        return session;
    }

    public synchronized IntermediateSession createDetachedIntermediateSession(IntermediateSessionParams params) {
        if (this.destroyed.get()) {
            throw new IllegalStateException("Object is destroyed.");
        }
        if (this.sessions.containsKey(params.getName())) {
            throw new IllegalArgumentException("Name '" + params.getName() + "' is not unique.");
        }
        IntermediateSession session = new IntermediateSession(this, params);
        this.sessions.put(params.getName(), session);
        return session;
    }

    public DestinationSession createDestinationSession(String name, boolean broadcastName, MessageHandler handler) {
        return this.createDestinationSession(new DestinationSessionParams().setName(name).setBroadcastName(broadcastName).setMessageHandler(handler));
    }

    public synchronized DestinationSession createDestinationSession(DestinationSessionParams params) {
        DestinationSession session = this.createDetachedDestinationSession(params);
        this.connect(params.getName(), params.getBroadcastName());
        return session;
    }

    public synchronized DestinationSession createDetachedDestinationSession(DestinationSessionParams params) {
        if (this.destroyed.get()) {
            throw new IllegalStateException("Object is destroyed.");
        }
        if (this.sessions.containsKey(params.getName())) {
            throw new IllegalArgumentException("Name '" + params.getName() + "' is not unique.");
        }
        DestinationSession session = new DestinationSession(this, params);
        this.sessions.put(params.getName(), session);
        return session;
    }

    public void connect(String session, boolean broadcast) {
        this.net.registerSession(session, this, broadcast);
    }

    public synchronized void unregisterSession(String name, boolean broadcastName) {
        this.net.unregisterSession(name, this, broadcastName);
        this.sessions.remove(name);
    }

    private boolean doAccounting() {
        return this.maxPendingCount > 0 || this.maxPendingSize > 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkPending(Message msg) {
        boolean busy = false;
        int size = msg.getApproxSize();
        if (this.doAccounting()) {
            MessageBus messageBus = this;
            synchronized (messageBus) {
                boolean bl = busy = this.maxPendingCount > 0 && this.pendingCount >= this.maxPendingCount || this.maxPendingSize > 0 && this.pendingSize >= this.maxPendingSize;
                if (!busy) {
                    ++this.pendingCount;
                    this.pendingSize += size;
                }
            }
        }
        if (busy) {
            return false;
        }
        msg.setContext(size);
        msg.pushHandler(this);
        return true;
    }

    @Override
    public void handleMessage(Message msg) {
        if (this.resender != null && msg.hasBucketSequence()) {
            this.deliverError(msg, 200014, "Bucket sequences not supported when resender is enabled.");
            return;
        }
        SendProxy proxy = new SendProxy(this, this.net.net(), this.resender);
        this.msn.deliverMessage(msg, proxy);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleReply(Reply reply) {
        if (this.destroyed.get()) {
            reply.discard();
            return;
        }
        if (this.doAccounting()) {
            MessageBus messageBus = this;
            synchronized (messageBus) {
                --this.pendingCount;
                this.pendingSize -= ((Integer)reply.getContext()).intValue();
            }
        }
        this.deliverReply(reply, reply.popHandler());
    }

    @Override
    public void deliverMessage(Message msg, String session) {
        MessageHandler msgHandler = this.sessions.get(session);
        if (msgHandler == null) {
            this.deliverError(msg, 100004, "Session '" + session + "' does not exist.");
        } else if (!this.checkPending(msg)) {
            this.deliverError(msg, 100005, "Session '" + this.net.net().getConnectionSpec() + "/" + session + "' is busy, try again later.");
        } else {
            this.msn.deliverMessage(msg, msgHandler);
        }
    }

    public void putProtocol(Protocol protocol) {
        this.protocolRepository.putProtocol(protocol);
    }

    @Override
    public Protocol getProtocol(Utf8Array name) {
        return this.protocolRepository.getProtocol(name.toString());
    }

    public void deliverReply(Reply reply, ReplyHandler handler) {
        this.msn.deliverReply(reply, handler);
    }

    @Override
    public void setupRouting(RoutingSpec spec) {
        HashMap<String, RoutingTable> tables = new HashMap<String, RoutingTable>();
        int len = spec.getNumTables();
        for (int i = 0; i < len; ++i) {
            RoutingTableSpec table = spec.getTable(i);
            String name = table.getProtocol();
            if (!this.protocolRepository.hasProtocol(name)) {
                log.log(Level.INFO, "Protocol '" + name + "' is not supported, ignoring routing table.");
                continue;
            }
            tables.put(name, new RoutingTable(table));
        }
        this.tablesRef.set(tables);
        this.protocolRepository.clearPolicyCache();
    }

    @Deprecated
    public Resender getResender() {
        return this.resender;
    }

    @Deprecated
    public synchronized int getPendingCount() {
        return this.pendingCount;
    }

    @Deprecated
    public synchronized int getPendingSize() {
        return this.pendingSize;
    }

    @Deprecated
    public void setMaxPendingCount(int maxCount) {
        this.maxPendingCount = maxCount;
    }

    @Deprecated
    public int getMaxPendingCount() {
        return this.maxPendingCount;
    }

    @Deprecated
    public void setMaxPendingSize(int maxSize) {
        this.maxPendingSize = maxSize;
    }

    @Deprecated
    public int getMaxPendingSize() {
        return this.maxPendingSize;
    }

    public RoutingTable getRoutingTable(String name) {
        Map<String, RoutingTable> tables = this.tablesRef.get();
        if (tables == null) {
            return null;
        }
        return tables.get(name);
    }

    public RoutingTable getRoutingTable(Utf8String name) {
        return this.getRoutingTable(name.toString());
    }

    public RoutingPolicy getRoutingPolicy(String protocolName, String policyName, String policyParam) {
        return this.protocolRepository.getRoutingPolicy(protocolName, policyName, policyParam);
    }

    public RoutingPolicy getRoutingPolicy(Utf8String protocolName, String policyName, String policyParam) {
        return this.protocolRepository.getRoutingPolicy(protocolName.toString(), policyName, policyParam);
    }

    public String getConnectionSpec() {
        return this.net.net().getConnectionSpec();
    }

    private void deliverError(Message msg, int errCode, String errMsg) {
        EmptyReply reply = new EmptyReply();
        reply.swapState(msg);
        reply.addError(new Error(errCode, errMsg));
        this.deliverReply(reply, reply.popHandler());
    }

    public static interface SendBlockedMessages {
        public boolean trySend();
    }

    private static class ResenderTask
    implements Messenger.Task {
        final Resender resender;

        ResenderTask(Resender resender) {
            this.resender = resender;
        }

        @Override
        public void destroy() {
        }

        @Override
        public void run() {
            this.resender.resendScheduled();
        }
    }
}

