/*
 * Decompiled with CFR 0.152.
 */
package com.github.jlangch.venice.util.ipc.impl;

import com.github.jlangch.venice.impl.types.collections.VncMap;
import com.github.jlangch.venice.util.ipc.IMessage;
import com.github.jlangch.venice.util.ipc.MessageType;
import com.github.jlangch.venice.util.ipc.ResponseStatus;
import com.github.jlangch.venice.util.ipc.TcpServer;
import com.github.jlangch.venice.util.ipc.impl.IPublisher;
import com.github.jlangch.venice.util.ipc.impl.Message;
import com.github.jlangch.venice.util.ipc.impl.Protocol;
import com.github.jlangch.venice.util.ipc.impl.ServerStatistics;
import com.github.jlangch.venice.util.ipc.impl.Subscriptions;
import com.github.jlangch.venice.util.ipc.impl.Topics;
import com.github.jlangch.venice.util.ipc.impl.util.Error;
import com.github.jlangch.venice.util.ipc.impl.util.ErrorCircularBuffer;
import com.github.jlangch.venice.util.ipc.impl.util.ExceptionUtil;
import com.github.jlangch.venice.util.ipc.impl.util.IO;
import com.github.jlangch.venice.util.ipc.impl.util.Json;
import com.github.jlangch.venice.util.ipc.impl.util.JsonBuilder;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;

public class TcpServerConnection
implements IPublisher,
Runnable {
    public static final int ERROR_QUEUE_CAPACITY = 50;
    private State mode = State.Request_Response;
    private final TcpServer server;
    private final SocketChannel ch;
    private final Function<IMessage, IMessage> handler;
    private final AtomicLong maxMessageSize;
    private final Subscriptions subscriptions;
    private final int publishQueueCapacity;
    private final ServerStatistics statistics;
    private final Supplier<VncMap> serverThreadPoolStatistics;
    private final ErrorCircularBuffer<Error> errorBuffer;
    private final LinkedBlockingQueue<Message> publishQueue;
    private final Map<String, LinkedBlockingQueue<Message>> p2pQueues;

    public TcpServerConnection(TcpServer server, SocketChannel ch, Function<IMessage, IMessage> handler, AtomicLong maxMessageSize, Subscriptions subscriptions, int publishQueueCapacity, Map<String, LinkedBlockingQueue<Message>> p2pQueues, ServerStatistics statistics, Supplier<VncMap> serverThreadPoolStatistics) {
        this.server = server;
        this.ch = ch;
        this.handler = handler;
        this.maxMessageSize = maxMessageSize;
        this.subscriptions = subscriptions;
        this.publishQueueCapacity = publishQueueCapacity;
        this.statistics = statistics;
        this.serverThreadPoolStatistics = serverThreadPoolStatistics;
        this.publishQueue = new LinkedBlockingQueue(publishQueueCapacity);
        this.errorBuffer = new ErrorCircularBuffer(50);
        this.p2pQueues = p2pQueues;
    }

    @Override
    public void run() {
        try {
            this.statistics.incrementConnectionCount();
            while (this.mode != State.Terminated && this.server.isRunning() && this.ch.isOpen()) {
                if (this.mode == State.Request_Response) {
                    this.mode = this.processRequestResponse();
                    continue;
                }
                if (this.mode == State.Publish) {
                    this.mode = this.processPublication();
                    continue;
                }
                break;
            }
        }
        catch (Exception exception) {
        }
        finally {
            this.statistics.decrementConnectionCount();
            this.subscriptions.removeSubscriptions(this);
            IO.safeClose(this.ch);
        }
    }

    @Override
    public void publish(Message msg) {
        try {
            this.publishQueue.offer(msg, 1L, TimeUnit.SECONDS);
        }
        catch (Exception ex) {
            this.errorBuffer.push(new Error("Failed to enque message for publishing!", msg, ex));
            this.statistics.incrementDiscardedPublishCount();
        }
    }

    private State processRequestResponse() throws InterruptedException {
        Message request = Protocol.receiveMessage(this.ch);
        if (request == null) {
            return State.Terminated;
        }
        this.statistics.incrementMessageCount();
        if (!this.server.isRunning()) {
            return State.Terminated;
        }
        if (!(TcpServerConnection.isRequestMsg(request) || TcpServerConnection.isRequestPublish(request) || TcpServerConnection.isRequestSubscribe(request) || TcpServerConnection.isRequestOffer(request) || TcpServerConnection.isRequestPoll(request))) {
            this.handleInvalidRequestType(request);
            return this.mode;
        }
        if ((long)request.getData().length > this.maxMessageSize.get()) {
            this.handleRequestTooLarge(request);
            return this.mode;
        }
        if (request.getTopic().startsWith("tcp-server/")) {
            this.handleTcpServerRequest(request);
            return State.Request_Response;
        }
        if (TcpServerConnection.isRequestPublish(request)) {
            this.handlePublish(request);
            return State.Request_Response;
        }
        if (TcpServerConnection.isRequestMsg(request)) {
            Message response = this.handleRequest(request);
            if (!this.server.isRunning()) {
                return State.Terminated;
            }
            if (response != null && !request.isOneway()) {
                Protocol.sendMessage(this.ch, response);
            }
            return State.Request_Response;
        }
        if (TcpServerConnection.isRequestOffer(request)) {
            this.handleOffer(request);
            return State.Request_Response;
        }
        if (TcpServerConnection.isRequestPoll(request)) {
            this.handlePoll(request);
            return State.Request_Response;
        }
        if (TcpServerConnection.isRequestSubscribe(request)) {
            this.handleSubscribe(request);
            return State.Publish;
        }
        return State.Request_Response;
    }

    private State processPublication() throws InterruptedException {
        Message msg = this.publishQueue.poll(5L, TimeUnit.SECONDS);
        if (msg != null) {
            this.statistics.incrementPublishCount();
            Protocol.sendMessage(this.ch, msg.withType(MessageType.REQUEST, true));
        }
        return State.Publish;
    }

    private Message handleRequest(Message request) {
        try {
            IMessage response = this.handler.apply(request);
            if (request.isOneway()) {
                return null;
            }
            if (response == null) {
                return TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.OK, request.getTopic(), "");
            }
            return ((Message)response).withType(MessageType.RESPONSE, true).withResponseStatus(ResponseStatus.OK);
        }
        catch (Exception ex) {
            if (request.isOneway()) {
                return null;
            }
            return TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.HANDLER_ERROR, request.getTopic(), ExceptionUtil.printStackTraceToString(ex));
        }
    }

    private void handleSubscribe(Message request) {
        this.subscriptions.addSubscription(request.getTopicsSet(), (IPublisher)this);
        Protocol.sendMessage(this.ch, TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.OK, request.getTopic(), "Subscribed to the topic."));
    }

    private void handlePublish(Message request) {
        this.subscriptions.publish(request);
        Protocol.sendMessage(this.ch, TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.OK, request.getTopic(), "Message has been enqued to publish."));
    }

    private void handleOffer(Message request) throws InterruptedException {
        String queueName = request.getQueueName();
        LinkedBlockingQueue<Message> queue = this.p2pQueues.get(queueName);
        if (queue != null) {
            Message msg = request.withType(MessageType.REQUEST, false);
            if (queue.offer(msg, 0L, TimeUnit.MILLISECONDS)) {
                Protocol.sendMessage(this.ch, TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.OK, request.getTopic(), "Offered the message to the queue."));
            } else {
                Protocol.sendMessage(this.ch, TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.QUEUE_FULL, request.getTopic(), "Offer rejected! The queue is full."));
            }
        } else {
            Protocol.sendMessage(this.ch, TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.QUEUE_NOT_FOUND, request.getTopic(), "Offer rejected! The queue does not exist."));
        }
    }

    private void handlePoll(Message request) throws InterruptedException {
        String queueName = request.getQueueName();
        LinkedBlockingQueue<Message> queue = this.p2pQueues.get(queueName);
        if (queue != null) {
            Message msg = queue.poll(0L, TimeUnit.MILLISECONDS);
            if (msg != null) {
                Protocol.sendMessage(this.ch, msg.withType(MessageType.RESPONSE, true).withResponseStatus(ResponseStatus.OK));
            } else {
                Protocol.sendMessage(this.ch, TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.QUEUE_EMPTY, request.getTopic(), "Poll rejected! The queue is empty."));
            }
        } else {
            Protocol.sendMessage(this.ch, TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.QUEUE_NOT_FOUND, request.getTopic(), "Poll rejected! The queue does not exist."));
        }
    }

    private void handleTcpServerRequest(Message request) {
        if ("tcp-server/status".equals(request.getTopic())) {
            Protocol.sendMessage(this.ch, this.getTcpServerStatus());
        } else if ("tcp-server/thread-pool-statistics".equals(request.getTopic())) {
            Protocol.sendMessage(this.ch, this.getTcpServerThreadPoolStatistics());
        } else if ("tcp-server/error".equals(request.getTopic())) {
            Protocol.sendMessage(this.ch, this.getTcpServerNextError());
        } else {
            Protocol.sendMessage(this.ch, TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.BAD_REQUEST, request.getTopic(), "Unknown tcp server request topic. \nValid topics are:\n  \u2022 tcp-server/status\n  \u2022 tcp-server/thread-pool-statistics\n  \u2022 tcp-server/error\n"));
        }
    }

    private void handleInvalidRequestType(Message request) {
        if (this.mode == State.Request_Response) {
            if (request.isOneway()) {
                this.errorBuffer.push(new Error("Bad request type '" + (Object)((Object)request.getType()) + "'! Cannot send error response for oneway request!", request));
                this.statistics.incrementDiscardedResponseCount();
            } else {
                Protocol.sendMessage(this.ch, TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.BAD_REQUEST, request.getTopic(), "Bad request type: " + request.getType().name()));
            }
        } else {
            this.errorBuffer.push(new Error("Bad request type '" + (Object)((Object)request.getType()) + "'! Cannot send error response for channel in publish mode!", request));
            this.statistics.incrementDiscardedPublishCount();
        }
    }

    private void handleRequestTooLarge(Message request) {
        if (this.mode == State.Request_Response) {
            if (request.isOneway()) {
                this.errorBuffer.push(new Error("Request too large! Cannot send error response for oneway request!", request));
                this.statistics.incrementDiscardedResponseCount();
            } else {
                this.sendTooLargeMessageResponse(request);
            }
        } else {
            this.statistics.incrementDiscardedPublishCount();
        }
    }

    private void sendTooLargeMessageResponse(Message request) {
        Protocol.sendMessage(this.ch, this.createTooLargeErrorMessageResponse(request));
    }

    private Message getTcpServerStatus() {
        return TcpServerConnection.createJsonResponseMessage(ResponseStatus.OK, "tcp-server/status", new JsonBuilder().add("running", this.server.isRunning()).add("mode", this.mode.name()).add("connection_count", this.statistics.getConnectionCount()).add("message_count", this.statistics.getMessageCount()).add("publish_count", this.statistics.getPublishCount()).add("response_discarded_count", this.statistics.getDiscardedResponseCount()).add("publish_discarded_count", this.statistics.getDiscardedPublishCount()).add("subscription_client_count", this.subscriptions.getClientSubscriptionCount()).add("subscription_topic_count", this.subscriptions.getTopicSubscriptionCount()).add("publish_queue_capacity", this.publishQueueCapacity).add("p2p-queue-count", this.p2pQueues.size()).add("error-queue-capacity", 50).add("message_size_min", 2048L).add("message_size_max", this.maxMessageSize.get()).toJson(false));
    }

    private Message getTcpServerNextError() {
        try {
            Error err = this.errorBuffer.pop();
            if (err == null) {
                return TcpServerConnection.createJsonResponseMessage(ResponseStatus.OK, "tcp-server/error", new JsonBuilder().add("status", "no_errors_available").toJson(false));
            }
            String description = err.getDescription();
            Exception ex = err.getException();
            String exMsg = ex == null ? null : ex.getMessage();
            return TcpServerConnection.createJsonResponseMessage(ResponseStatus.OK, "tcp-server/error", new JsonBuilder().add("status", "error").add("description", description).add("exception", exMsg).add("errors-left", this.errorBuffer.size()).toJson(false));
        }
        catch (Exception ex) {
            return TcpServerConnection.createJsonResponseMessage(ResponseStatus.OK, "tcp-server/error", new JsonBuilder().add("status", "temporarily_unavailable").toJson(false));
        }
    }

    private Message getTcpServerThreadPoolStatistics() {
        VncMap statistics = this.serverThreadPoolStatistics.get();
        return TcpServerConnection.createJsonResponseMessage(ResponseStatus.OK, "tcp-server/thread-pool-statistics", Json.writeJson(statistics, false));
    }

    private static Message createJsonResponseMessage(ResponseStatus status, String topic, String json) {
        return new Message(MessageType.RESPONSE, status, true, Topics.of(topic), "application/json", "UTF-8", TcpServerConnection.toBytes(json, "UTF-8"));
    }

    private static Message createPlainTextResponseMessage(ResponseStatus status, String topic, String text) {
        return new Message(MessageType.RESPONSE, status, true, Topics.of(topic), "text/plain", "UTF-8", TcpServerConnection.toBytes(text, "UTF-8"));
    }

    private Message createTooLargeErrorMessageResponse(Message request) {
        return TcpServerConnection.createPlainTextResponseMessage(ResponseStatus.BAD_REQUEST, request.getTopic(), String.format("The message (%d bytes) is too large! The limit is at %d bytes.", request.getData().length, this.maxMessageSize));
    }

    private static boolean isRequestMsg(Message msg) {
        return msg.getType() == MessageType.REQUEST;
    }

    private static boolean isRequestSubscribe(Message msg) {
        return msg.getType() == MessageType.SUBSCRIBE;
    }

    private static boolean isRequestPublish(Message msg) {
        return msg.getType() == MessageType.PUBLISH;
    }

    private static boolean isRequestOffer(Message msg) {
        return msg.getType() == MessageType.OFFER;
    }

    private static boolean isRequestPoll(Message msg) {
        return msg.getType() == MessageType.POLL;
    }

    private static byte[] toBytes(String s, String charset) {
        return s.getBytes(Charset.forName(charset));
    }

    private static enum State {
        Request_Response,
        Publish,
        Terminated;

    }
}

