/*
 * Decompiled with CFR 0.152.
 */
package com.yahoo.messagebus;

import com.yahoo.messagebus.EmptyReply;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Result;
import com.yahoo.messagebus.Sequencer;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.ThrottlePolicy;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

public final class SourceSession
implements ReplyHandler,
MessageBus.SendBlockedMessages {
    private final AtomicBoolean destroyed = new AtomicBoolean(false);
    private final CountDownLatch done = new CountDownLatch(1);
    private final AtomicBoolean sendingBlockedToken = new AtomicBoolean(false);
    private final Object lock = new Object();
    private final MessageBus mbus;
    private final Sequencer sequencer;
    private final ReplyHandler replyHandler;
    private final ThrottlePolicy throttlePolicy;
    private volatile double timeout;
    private volatile int pendingCount = 0;
    private volatile boolean closed = false;
    private final Deque<BlockedMessage> blockedQ = new LinkedList<BlockedMessage>();
    private static ThreadLocal<Counter> sendBlockedRecurseLevel = ThreadLocal.withInitial(() -> new Counter());

    SourceSession(MessageBus mbus, SourceSessionParams params) {
        this.mbus = mbus;
        this.sequencer = new Sequencer(mbus);
        if (!params.hasReplyHandler()) {
            throw new NullPointerException("Reply handler is null.");
        }
        this.replyHandler = params.getReplyHandler();
        this.throttlePolicy = params.getThrottlePolicy();
        this.timeout = params.getTimeout();
        mbus.register(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean destroy() {
        if (this.destroyed.getAndSet(true)) {
            return false;
        }
        Object object = this.lock;
        synchronized (object) {
            this.closed = true;
        }
        this.sequencer.destroy();
        this.mbus.sync();
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        Object object = this.lock;
        synchronized (object) {
            this.closed = true;
        }
        if (this.pendingCount == 0) {
            this.done.countDown();
        }
        try {
            this.done.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.destroy();
    }

    public Result send(Message message) {
        return this.sendInternal(this.updateTiming(message));
    }

    private Message updateTiming(Message msg) {
        msg.setTimeReceivedNow();
        if (msg.getTimeRemaining() <= 0L) {
            msg.setTimeRemaining((long)this.timeout * 1000L);
        }
        return msg;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Result sendInternal(Message message) {
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return new Result(200001, "Source session is closed.");
            }
            if (this.throttlePolicy != null && !this.throttlePolicy.canSend(message, this.pendingCount)) {
                return new Result(100001, "Too much pending data (" + this.pendingCount + " messages).");
            }
            message.pushHandler(this.replyHandler);
            if (this.throttlePolicy != null) {
                this.throttlePolicy.processMessage(message);
            }
            ++this.pendingCount;
        }
        if (message.getTrace().shouldTrace(6)) {
            message.getTrace().trace(6, "Source session accepted a " + message.getApproxSize() + " byte message. " + this.pendingCount + " message(s) now pending.");
        }
        message.pushHandler(this);
        this.sequencer.handleMessage(message);
        return Result.ACCEPTED;
    }

    @Override
    public boolean trySend() {
        if (this.destroyed.get()) {
            return false;
        }
        this.sendBlockedMessages();
        this.expireStalledBlockedMessages();
        return true;
    }

    private Reply createSendTimedOutReply(Message msg, Error error) {
        EmptyReply reply = new EmptyReply();
        reply.setMessage(msg);
        reply.addError(error);
        msg.swapState(reply);
        return reply;
    }

    private static boolean isSendQFull(Result res) {
        return !res.isAccepted() && res.getError().getCode() == 100001;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Result sendBlocking(Message msg) throws InterruptedException {
        Result res = this.send(msg);
        if (SourceSession.isSendQFull(res)) {
            BlockedMessage blockedMessage = new BlockedMessage(msg);
            Deque<BlockedMessage> deque = this.blockedQ;
            synchronized (deque) {
                this.blockedQ.add(blockedMessage);
            }
            res = blockedMessage.waitComplete();
        }
        return res;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void expireStalledBlockedMessages() {
        Deque<BlockedMessage> deque = this.blockedQ;
        synchronized (deque) {
            this.blockedQ.removeIf(BlockedMessage::notifyIfExpired);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BlockedMessage getNextBlockedMessage() {
        Deque<BlockedMessage> deque = this.blockedQ;
        synchronized (deque) {
            return this.blockedQ.poll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendBlockedMessages() {
        Counter recurselevel = sendBlockedRecurseLevel.get();
        if (recurselevel.enough()) {
            return;
        }
        boolean someoneElseIsTakingCareOfIt = this.sendingBlockedToken.getAndSet(true);
        if (someoneElseIsTakingCareOfIt) {
            return;
        }
        try {
            recurselevel.inc();
            BlockedMessage msg = this.getNextBlockedMessage();
            boolean success = true;
            while (success && msg != null) {
                success = msg.sendOrExpire();
                if (!success) {
                    Deque<BlockedMessage> deque = this.blockedQ;
                    synchronized (deque) {
                        this.blockedQ.addFirst(msg);
                        continue;
                    }
                }
                msg = this.getNextBlockedMessage();
            }
        }
        finally {
            recurselevel.dec();
            this.sendingBlockedToken.set(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleReply(Reply reply) {
        boolean done;
        if (this.destroyed.get()) {
            reply.discard();
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            --this.pendingCount;
            if (this.throttlePolicy != null) {
                this.throttlePolicy.processReply(reply);
            }
            done = this.closed && this.pendingCount == 0;
        }
        this.sendBlockedMessages();
        if (reply.getTrace().shouldTrace(6)) {
            reply.getTrace().trace(6, "Source session received reply. " + this.pendingCount + " message(s) now pending.");
        }
        ReplyHandler handler = reply.popHandler();
        handler.handleReply(reply);
        if (done) {
            this.done.countDown();
        }
    }

    public Result send(Message msg, Route route) {
        return this.send(msg.setRoute(route));
    }

    public Result send(Message msg, String routeName) {
        return this.send(msg, routeName, false);
    }

    public Result send(Message msg, String routeName, boolean parseIfNotFound) {
        boolean found = false;
        RoutingTable table = this.mbus.getRoutingTable(msg.getProtocol().toString());
        if (table != null) {
            Route route = table.getRoute(routeName);
            if (route != null) {
                msg.setRoute(new Route(route));
                found = true;
            } else if (!parseIfNotFound) {
                return new Result(200002, "Route '" + routeName + "' not found for protocol '" + msg.getProtocol() + "'.");
            }
        } else if (!parseIfNotFound) {
            return new Result(200002, "Protocol '" + msg.getProtocol() + "' has no routing table.");
        }
        if (!found) {
            msg.setRoute(Route.parse(routeName));
        }
        return this.send(msg);
    }

    public ReplyHandler getReplyHandler() {
        return this.replyHandler;
    }

    public int getPendingCount() {
        return this.pendingCount;
    }

    public SourceSession setTimeout(double timeout) {
        this.timeout = timeout;
        return this;
    }

    public ThrottlePolicy getThrottlePolicy() {
        return this.throttlePolicy;
    }

    private class BlockedMessage {
        private final Message msg;
        private Result result = null;

        BlockedMessage(Message msg) {
            this.msg = msg;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void notifyComplete(Result result) {
            BlockedMessage blockedMessage = this;
            synchronized (blockedMessage) {
                this.result = result;
                this.notify();
            }
        }

        boolean notifyIfExpired() {
            if (this.msg.isExpired()) {
                Error error = new Error(200009, "Timed out in sendQ");
                this.notifyComplete(new Result(error));
                SourceSession.this.replyHandler.handleReply(SourceSession.this.createSendTimedOutReply(this.msg, error));
                return true;
            }
            return false;
        }

        boolean sendOrExpire() {
            if (!this.notifyIfExpired()) {
                Result res = SourceSession.this.sendInternal(this.msg);
                if (!SourceSession.isSendQFull(res)) {
                    this.notifyComplete(res);
                } else {
                    return false;
                }
            }
            return true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        Result waitComplete() throws InterruptedException {
            BlockedMessage blockedMessage = this;
            synchronized (blockedMessage) {
                while (this.result == null) {
                    this.wait();
                }
            }
            return this.result;
        }
    }

    private static final class Counter {
        private int count = 0;

        private Counter() {
        }

        void inc() {
            ++this.count;
        }

        void dec() {
            --this.count;
        }

        boolean enough() {
            return this.count > 5;
        }
    }
}

