/*
 * Decompiled with CFR 0.152.
 */
package com.github.jlangch.venice.util.ipc;

import com.github.jlangch.venice.InterruptedException;
import com.github.jlangch.venice.TimeoutException;
import com.github.jlangch.venice.VncException;
import com.github.jlangch.venice.impl.threadpool.ManagedCachedThreadPoolExecutor;
import com.github.jlangch.venice.impl.types.collections.VncMap;
import com.github.jlangch.venice.impl.util.CollectionUtil;
import com.github.jlangch.venice.util.ipc.IMessage;
import com.github.jlangch.venice.util.ipc.MessageType;
import com.github.jlangch.venice.util.ipc.ResponseStatus;
import com.github.jlangch.venice.util.ipc.impl.Message;
import com.github.jlangch.venice.util.ipc.impl.Protocol;
import com.github.jlangch.venice.util.ipc.impl.TcpSubscriptionListener;
import com.github.jlangch.venice.util.ipc.impl.Topics;
import com.github.jlangch.venice.util.ipc.impl.util.IO;
import com.github.jlangch.venice.util.ipc.impl.util.Json;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

public class TcpClient
implements Closeable {
    public static final long MESSAGE_LIMIT_MIN = 2048L;
    public static final long MESSAGE_LIMIT_MAX = 0xC800000L;
    private final String host;
    private final int port;
    private final String endpointId;
    private final AtomicBoolean opened = new AtomicBoolean(false);
    private final AtomicReference<SocketChannel> channel = new AtomicReference();
    private final AtomicBoolean subscription = new AtomicBoolean(false);
    private final AtomicLong maxMessageSize = new AtomicLong(0xC800000L);
    private final AtomicLong messageSentCount = new AtomicLong(0L);
    private final AtomicLong messageReceiveCount = new AtomicLong(0L);
    private final ManagedCachedThreadPoolExecutor mngdExecutor = new ManagedCachedThreadPoolExecutor("venice-tcpclient-pool", 10);

    public TcpClient(int port) {
        this("127.0.0.1", port);
    }

    public TcpClient(String host, int port) {
        this.host = host;
        this.port = port;
        this.endpointId = UUID.randomUUID().toString();
    }

    public TcpClient setMaximumParallelTasks(int count) {
        this.mngdExecutor.setMaximumThreadPoolSize(Math.max(1, count));
        return this;
    }

    public TcpClient setMaximumMessageSize(long maxSize) {
        this.maxMessageSize.set(Math.max(2048L, Math.min(0xC800000L, maxSize)));
        return this;
    }

    public long getMaximumMessageSize() {
        return this.maxMessageSize.get();
    }

    public long getMessageSendCount() {
        return this.messageSentCount.get();
    }

    public long getMessageReceiveCount() {
        return this.messageReceiveCount.get();
    }

    public String getEndpointId() {
        return this.endpointId;
    }

    public void open() {
        if (this.opened.compareAndSet(false, true)) {
            SocketChannel ch = null;
            try {
                ch = SocketChannel.open(new InetSocketAddress(this.host, this.port));
                this.channel.set(ch);
            }
            catch (Exception ex) {
                IO.safeClose(ch);
                this.opened.set(false);
                this.channel.set(null);
                throw new VncException("Failed to open TcpClient for server " + this.host + "/" + this.port + "!", ex);
            }
        } else {
            throw new VncException("This TcpClient is already open!");
        }
    }

    public boolean isRunning() {
        SocketChannel ch = this.channel.get();
        return ch != null && ch.isOpen();
    }

    @Override
    public void close() throws IOException {
        if (this.opened.compareAndSet(true, false)) {
            IO.safeClose(this.channel.get());
            this.channel.set(null);
        }
    }

    public IMessage sendMessage(IMessage msg) {
        Objects.requireNonNull(msg);
        Message m = ((Message)msg).withType(MessageType.REQUEST, false);
        return this.send(m);
    }

    public void sendMessageOneway(IMessage msg) {
        Objects.requireNonNull(msg);
        Message m = ((Message)msg).withType(MessageType.REQUEST, true);
        this.send(m);
    }

    public IMessage sendMessage(IMessage msg, long timeout, TimeUnit unit) {
        Objects.requireNonNull(msg);
        Objects.requireNonNull(unit);
        Message m = ((Message)msg).withType(MessageType.REQUEST, false);
        return this.send(m, timeout, unit);
    }

    public Future<IMessage> sendMessageAsync(IMessage msg) {
        Objects.requireNonNull(msg);
        Message m = ((Message)msg).withType(MessageType.REQUEST, false);
        return this.sendAsync(m);
    }

    public IMessage subscribe(String topic, Consumer<IMessage> handler) {
        Objects.requireNonNull(topic);
        Objects.requireNonNull(handler);
        return this.subscribe(CollectionUtil.toSet(topic), handler);
    }

    public IMessage subscribe(Set<String> topics, Consumer<IMessage> handler) {
        Objects.requireNonNull(topics);
        Objects.requireNonNull(handler);
        if (topics.isEmpty()) {
            throw new VncException("A subscription topic set must not be empty!");
        }
        SocketChannel ch = this.channel.get();
        if (ch == null || !ch.isOpen()) {
            throw new VncException("This TcpClient is not open!");
        }
        Message subscribeMsg = new Message(MessageType.SUBSCRIBE, ResponseStatus.NULL, false, Topics.of(topics), "text/plain", "UTF-8", TcpClient.toBytes(this.endpointId, "UTF-8"));
        if (this.subscription.compareAndSet(false, true)) {
            try {
                Callable<Message> task = () -> {
                    Protocol.sendMessage(ch, subscribeMsg);
                    this.messageSentCount.incrementAndGet();
                    Message response = Protocol.receiveMessage(ch);
                    this.messageReceiveCount.incrementAndGet();
                    return response;
                };
                Message response = this.mngdExecutor.getExecutor().submit(task).get(5L, TimeUnit.SECONDS);
                if (response.getResponseStatus() == ResponseStatus.OK) {
                    this.mngdExecutor.getExecutor().submit(new TcpSubscriptionListener(ch, handler));
                    return response;
                }
                throw new VncException("Failed to start subscription mode");
            }
            catch (java.util.concurrent.TimeoutException ex) {
                throw new TimeoutException("Timeout while waiting for IPC response.");
            }
            catch (ExecutionException ex) {
                Throwable cause = ex.getCause();
                if (cause instanceof VncException) {
                    throw (VncException)cause;
                }
                throw new VncException("Error in IPC call", cause);
            }
            catch (java.lang.InterruptedException ex) {
                this.subscription.set(false);
                throw new InterruptedException("Interrupted while waiting for IPC response.");
            }
            catch (Exception ex) {
                this.subscription.set(false);
                throw ex;
            }
        }
        throw new VncException("The client is already in subscription mode!");
    }

    public IMessage publish(IMessage msg) {
        Objects.requireNonNull(msg);
        this.validateMessageSize(msg);
        Message m = ((Message)msg).withType(MessageType.PUBLISH, false);
        if (this.subscription.get()) {
            return this.sendThroughTemporaryClient(m, 5L, TimeUnit.SECONDS);
        }
        return this.send(m, 5L, TimeUnit.SECONDS);
    }

    public IMessage offer(IMessage msg, String queueName, long timeout, TimeUnit unit) {
        Objects.requireNonNull(msg);
        Objects.requireNonNull(queueName);
        Objects.requireNonNull(unit);
        Message m = new Message(null, MessageType.OFFER, ResponseStatus.NULL, false, queueName, -1L, ((Message)msg).getTopics(), ((Message)msg).getMimetype(), ((Message)msg).getCharset(), ((Message)msg).getData());
        return this.send(m, timeout, unit);
    }

    public IMessage poll(String queueName, long timeout, TimeUnit unit) {
        Objects.requireNonNull(queueName);
        Objects.requireNonNull(unit);
        Message m = new Message(null, MessageType.POLL, ResponseStatus.NULL, false, queueName, -1L, Topics.of("queue/poll"), "application/octet-stream", null, new byte[0]);
        return this.send(m, timeout, unit);
    }

    private IMessage send(IMessage msg) {
        Objects.requireNonNull(msg);
        this.validateMessageSize(msg);
        if (this.subscription.get()) {
            throw new VncException("A client in subscription mode cannot send request messages!");
        }
        SocketChannel ch = this.channel.get();
        if (ch == null || !ch.isOpen()) {
            throw new VncException("This TcpClient is not open!");
        }
        Message localResponse = this.handleClientLocalMessage(msg);
        if (localResponse != null) {
            return localResponse;
        }
        Protocol.sendMessage(ch, (Message)msg);
        this.messageSentCount.incrementAndGet();
        if (msg.isOneway()) {
            return null;
        }
        Message response = Protocol.receiveMessage(ch);
        this.messageReceiveCount.incrementAndGet();
        return response;
    }

    private IMessage send(IMessage msg, long timeout, TimeUnit unit) {
        Objects.requireNonNull(msg);
        Objects.requireNonNull(unit);
        this.validateMessageSize(msg);
        if (this.subscription.get()) {
            throw new VncException("A client in subscription mode cannot send request messages!");
        }
        Message localResponse = this.handleClientLocalMessage(msg);
        if (localResponse != null) {
            return localResponse;
        }
        try {
            return this.sendAsync(msg).get(timeout, unit);
        }
        catch (VncException ex) {
            throw ex;
        }
        catch (java.util.concurrent.TimeoutException ex) {
            throw new TimeoutException("Timeout while waiting for IPC response.");
        }
        catch (ExecutionException ex) {
            Throwable cause = ex.getCause();
            if (cause instanceof VncException) {
                throw (VncException)cause;
            }
            throw new VncException("Error in IPC call", cause);
        }
        catch (java.lang.InterruptedException ex) {
            throw new InterruptedException("Interrupted while waiting for IPC response.");
        }
    }

    private Future<IMessage> sendAsync(IMessage msg) {
        Objects.requireNonNull(msg);
        this.validateMessageSize(msg);
        if (this.subscription.get()) {
            throw new VncException("A client in subscription mode cannot send request messages!");
        }
        SocketChannel ch = this.channel.get();
        if (ch == null || !ch.isOpen()) {
            throw new VncException("This TcpClient is not open!");
        }
        Message localResponse = this.handleClientLocalMessage(msg);
        if (localResponse != null) {
            return this.mngdExecutor.getExecutor().submit(() -> localResponse);
        }
        Callable<IMessage> task = () -> {
            Protocol.sendMessage(ch, (Message)msg);
            this.messageSentCount.incrementAndGet();
            if (msg.isOneway()) {
                return null;
            }
            Message response = Protocol.receiveMessage(ch);
            this.messageReceiveCount.incrementAndGet();
            return response;
        };
        return this.mngdExecutor.getExecutor().submit(task);
    }

    private IMessage sendThroughTemporaryClient(IMessage msg, long timeout, TimeUnit unit) {
        Objects.requireNonNull(msg);
        Objects.requireNonNull(unit);
        IMessage response = null;
        try (TcpClient client = new TcpClient(this.host, this.port);){
            client.open();
            response = client.send(msg, timeout, unit);
        }
        catch (IOException iOException) {
            // empty catch block
        }
        return response;
    }

    private Message handleClientLocalMessage(IMessage request) {
        if ("client/thread-pool-statistics".equals(request.getTopic())) {
            return this.getClientThreadPoolStatistics();
        }
        return null;
    }

    private Message getClientThreadPoolStatistics() {
        VncMap statistics = this.mngdExecutor.info();
        return new Message(MessageType.RESPONSE, ResponseStatus.OK, false, Topics.of("client/thread-pool-statistics"), "application/json", "UTF-8", TcpClient.toBytes(Json.writeJson(statistics, false), "UTF-8"));
    }

    private void validateMessageSize(IMessage msg) {
        Objects.requireNonNull(msg);
        if ((long)msg.getData().length > this.maxMessageSize.get()) {
            throw new VncException(String.format("The message (%dB) is too large! The limit is at %dB", msg.getData().length, this.maxMessageSize.get()));
        }
    }

    private static byte[] toBytes(String s, String charset) {
        return s.getBytes(Charset.forName(charset));
    }
}

