/*
 * Decompiled with CFR 0.152.
 */
package quickfix.mina;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import quickfix.LogUtil;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.mina.EventHandlingStrategy;
import quickfix.mina.QueueTracker;
import quickfix.mina.QueueTrackers;
import quickfix.mina.SessionConnector;

public class ThreadPerSessionEventHandlingStrategy
implements EventHandlingStrategy {
    private final ConcurrentMap<SessionID, MessageDispatchingThread> dispatchers = new ConcurrentHashMap<SessionID, MessageDispatchingThread>();
    private final SessionConnector sessionConnector;
    private final int queueCapacity;
    private final int queueLowerWatermark;
    private final int queueUpperWatermark;
    private volatile Executor executor;

    public ThreadPerSessionEventHandlingStrategy(SessionConnector connector, int queueCapacity) {
        this.sessionConnector = connector;
        this.queueCapacity = queueCapacity;
        this.queueLowerWatermark = -1;
        this.queueUpperWatermark = -1;
    }

    public ThreadPerSessionEventHandlingStrategy(SessionConnector connector, int queueLowerWatermark, int queueUpperWatermark) {
        this.sessionConnector = connector;
        this.queueCapacity = -1;
        this.queueLowerWatermark = queueLowerWatermark;
        this.queueUpperWatermark = queueUpperWatermark;
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }

    MessageDispatchingThread createDispatcherThread(Session quickfixSession) {
        return new MessageDispatchingThread(quickfixSession, this.executor);
    }

    @Override
    public void onMessage(Session quickfixSession, Message message) {
        MessageDispatchingThread dispatcher = (MessageDispatchingThread)this.dispatchers.get(quickfixSession.getSessionID());
        if (dispatcher == null) {
            dispatcher = this.dispatchers.computeIfAbsent(quickfixSession.getSessionID(), sessionID -> {
                MessageDispatchingThread newDispatcher = this.createDispatcherThread(quickfixSession);
                this.startDispatcherThread(newDispatcher);
                return newDispatcher;
            });
        }
        if (message != null) {
            dispatcher.enqueue(message);
        }
    }

    @Override
    public SessionConnector getSessionConnector() {
        return this.sessionConnector;
    }

    protected void startDispatcherThread(MessageDispatchingThread dispatcher) {
        dispatcher.start();
    }

    public void stopDispatcherThreads() {
        Collection dispatchersToShutdown = this.dispatchers.values();
        for (MessageDispatchingThread dispatcher : dispatchersToShutdown) {
            dispatcher.stopDispatcher();
        }
        while (!dispatchersToShutdown.isEmpty()) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            dispatchersToShutdown.removeIf(MessageDispatchingThread::isStopped);
        }
    }

    protected MessageDispatchingThread getDispatcher(SessionID sessionID) {
        return (MessageDispatchingThread)this.dispatchers.get(sessionID);
    }

    protected Message getNextMessage(QueueTracker<Message> queueTracker) throws InterruptedException {
        return queueTracker.poll(250L, TimeUnit.MILLISECONDS);
    }

    @Override
    public int getQueueSize() {
        int ret = 0;
        for (MessageDispatchingThread mdt : this.dispatchers.values()) {
            ret += mdt.getQueueSize();
        }
        return ret;
    }

    @Override
    public int getQueueSize(SessionID sessionID) {
        MessageDispatchingThread dispatchingThread = (MessageDispatchingThread)this.dispatchers.get(sessionID);
        if (dispatchingThread != null) {
            return dispatchingThread.getQueueSize();
        }
        return 0;
    }

    protected class MessageDispatchingThread
    extends ThreadAdapter {
        private final Session quickfixSession;
        private final BlockingQueue<Message> messages;
        private final QueueTracker<Message> queueTracker;
        private volatile boolean stopped;
        private volatile boolean stopping;

        private MessageDispatchingThread(Session session, Executor executor) {
            super("QF/J Session dispatcher: " + session.getSessionID(), executor);
            this.quickfixSession = session;
            if (ThreadPerSessionEventHandlingStrategy.this.queueCapacity >= 0) {
                this.messages = new LinkedBlockingQueue<Message>(ThreadPerSessionEventHandlingStrategy.this.queueCapacity);
                this.queueTracker = QueueTrackers.newDefaultQueueTracker(this.messages);
            } else {
                this.messages = new LinkedBlockingQueue<Message>();
                this.queueTracker = ThreadPerSessionEventHandlingStrategy.this.queueLowerWatermark > 0 && ThreadPerSessionEventHandlingStrategy.this.queueUpperWatermark > 0 ? QueueTrackers.newSingleSessionWatermarkTracker(this.messages, ThreadPerSessionEventHandlingStrategy.this.queueLowerWatermark, ThreadPerSessionEventHandlingStrategy.this.queueUpperWatermark, this.quickfixSession) : QueueTrackers.newDefaultQueueTracker(this.messages);
            }
        }

        public void enqueue(Message message) {
            if (message == EventHandlingStrategy.END_OF_STREAM && this.stopping) {
                return;
            }
            try {
                this.queueTracker.put(message);
            }
            catch (InterruptedException e) {
                this.quickfixSession.getLog().onErrorEvent(e.toString());
            }
        }

        public int getQueueSize() {
            return this.messages.size();
        }

        @Override
        void doRun() {
            while (!this.stopping) {
                try {
                    Message message = ThreadPerSessionEventHandlingStrategy.this.getNextMessage(this.queueTracker);
                    if (message == null) continue;
                    this.quickfixSession.next(message);
                    if (message != EventHandlingStrategy.END_OF_STREAM) continue;
                    this.stopping = true;
                }
                catch (InterruptedException e) {
                    LogUtil.logThrowable(this.quickfixSession.getSessionID(), "Message dispatcher interrupted", (Throwable)e);
                    this.stopping = true;
                }
                catch (Throwable e) {
                    LogUtil.logThrowable(this.quickfixSession.getSessionID(), "Error during message processing", e);
                }
            }
            if (!this.messages.isEmpty()) {
                ArrayList tempList = new ArrayList(this.messages.size());
                this.queueTracker.drainTo(tempList);
                for (Message message : tempList) {
                    try {
                        this.quickfixSession.next(message);
                    }
                    catch (Throwable e) {
                        LogUtil.logThrowable(this.quickfixSession.getSessionID(), "Error during message processing", e);
                    }
                }
            }
            ThreadPerSessionEventHandlingStrategy.this.dispatchers.remove(this.quickfixSession.getSessionID());
            this.stopped = true;
        }

        public void stopDispatcher() {
            this.enqueue(EventHandlingStrategy.END_OF_STREAM);
            this.stopping = true;
            this.stopped = true;
        }

        public boolean isStopped() {
            return this.stopped;
        }
    }

    protected static abstract class ThreadAdapter
    implements Runnable {
        private final Executor executor;
        private final String name;

        public ThreadAdapter(String name, Executor executor) {
            this.name = name;
            this.executor = executor != null ? executor : new DedicatedThreadExecutor(name);
        }

        public void start() {
            this.executor.execute(this);
        }

        @Override
        public final void run() {
            Thread currentThread = Thread.currentThread();
            String threadName = currentThread.getName();
            try {
                if (!this.name.equals(threadName)) {
                    currentThread.setName(this.name + " (" + threadName + ")");
                }
                this.doRun();
            }
            finally {
                currentThread.setName(threadName);
            }
        }

        abstract void doRun();

        static final class DedicatedThreadExecutor
        implements Executor {
            private final String name;

            DedicatedThreadExecutor(String name) {
                this.name = name;
            }

            @Override
            public void execute(Runnable command) {
                new Thread(command, this.name).start();
            }
        }
    }
}

