/*
 * Decompiled with CFR 0.152.
 */
package org.terracotta.passthrough;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.terracotta.connection.Connection;
import org.terracotta.connection.entity.Entity;
import org.terracotta.connection.entity.EntityRef;
import org.terracotta.entity.EntityClientService;
import org.terracotta.entity.EntityMessage;
import org.terracotta.entity.EntityResponse;
import org.terracotta.entity.MessageCodecException;
import org.terracotta.exception.ConnectionClosedException;
import org.terracotta.exception.EntityException;
import org.terracotta.passthrough.Assert;
import org.terracotta.passthrough.PassthroughConnectionState;
import org.terracotta.passthrough.PassthroughEndpointConnector;
import org.terracotta.passthrough.PassthroughEndpointConnectorImpl;
import org.terracotta.passthrough.PassthroughEntityClientEndpoint;
import org.terracotta.passthrough.PassthroughEntityRef;
import org.terracotta.passthrough.PassthroughMessage;
import org.terracotta.passthrough.PassthroughMessageCodec;
import org.terracotta.passthrough.PassthroughMonitor;
import org.terracotta.passthrough.PassthroughServerProcess;
import org.terracotta.passthrough.PassthroughUncaughtExceptionHandler;
import org.terracotta.passthrough.PassthroughWait;

public class PassthroughConnection
implements Connection {
    private final String connectionName;
    private final String uuid;
    private final PassthroughConnectionState connectionState;
    private final List<EntityClientService<?, ?, ? extends EntityMessage, ? extends EntityResponse, ?>> entityClientServices;
    private long nextClientEndpointID;
    private final Map<Long, PassthroughEntityClientEndpoint<?, ?>> localEndpoints;
    private final Runnable onClose;
    private final long uniqueConnectionID;
    private final PassthroughEndpointConnector endpointConnector;
    private final String readerThreadName;
    private volatile State state = State.INIT;
    private Thread clientThread;
    private final List<ServerToClientMessageRecord> messageQueue;
    private final List<Waiter> clientResponseWaitQueue;
    private Map<Long, PassthroughWait> waitersToResend;

    public PassthroughConnection(String connectionName, String readerThreadName, PassthroughServerProcess serverProcess, List<EntityClientService<?, ?, ? extends EntityMessage, ? extends EntityResponse, ?>> entityClientServices, Runnable onClose, long uniqueConnectionID) {
        this(connectionName, readerThreadName, serverProcess, entityClientServices, onClose, uniqueConnectionID, new PassthroughEndpointConnectorImpl());
    }

    public PassthroughConnection(String connectionName, String readerThreadName, PassthroughServerProcess serverProcess, List<EntityClientService<?, ?, ? extends EntityMessage, ? extends EntityResponse, ?>> entityClientServices, Runnable onClose, long uniqueConnectionID, PassthroughEndpointConnector endpointConnector) {
        this.connectionName = connectionName;
        this.uuid = UUID.randomUUID().toString();
        this.connectionState = new PassthroughConnectionState(serverProcess);
        this.entityClientServices = entityClientServices;
        this.nextClientEndpointID = 1L;
        this.localEndpoints = new HashMap();
        this.onClose = onClose;
        this.uniqueConnectionID = uniqueConnectionID;
        this.endpointConnector = endpointConnector;
        this.readerThreadName = readerThreadName;
        this.messageQueue = new Vector<ServerToClientMessageRecord>();
        this.clientResponseWaitQueue = new Vector<Waiter>();
    }

    public boolean isValid() {
        return this.state == State.RUNNING;
    }

    public void startProcessingRequests() {
        this.clientThread = new Thread(() -> this.runClientThread());
        this.clientThread.setName(this.readerThreadName);
        this.clientThread.setUncaughtExceptionHandler(PassthroughUncaughtExceptionHandler.sharedInstance);
        this.state = State.RUNNING;
        this.clientThread.start();
    }

    public String getConnectionName() {
        return this.connectionName;
    }

    public String getUUID() {
        return this.uuid;
    }

    public long getUniqueConnectionID() {
        return this.uniqueConnectionID;
    }

    public PassthroughWait sendInternalMessageAfterAcks(PassthroughMessage message) {
        boolean shouldWaitForSent = true;
        boolean shouldWaitForReceived = true;
        boolean shouldWaitForCompleted = true;
        boolean shouldWaitForRetired = false;
        boolean forceGetToBlockOnRetire = true;
        return this.invokeAndWait(message, shouldWaitForSent, shouldWaitForReceived, shouldWaitForCompleted, shouldWaitForRetired, forceGetToBlockOnRetire, null);
    }

    public Future<byte[]> invokeActionAndWaitForAcks(PassthroughMessage message, boolean shouldWaitForSent, boolean shouldWaitForReceived, boolean shouldWaitForCompleted, boolean shouldWaitForRetired, boolean forceGetToBlockOnRetire, PassthroughMonitor monitor) {
        return this.invokeAndWait(message, shouldWaitForSent, shouldWaitForReceived, shouldWaitForCompleted, shouldWaitForRetired, forceGetToBlockOnRetire, monitor);
    }

    private PassthroughWait invokeAndWait(PassthroughMessage message, boolean shouldWaitForSent, boolean shouldWaitForReceived, boolean shouldWaitForCompleted, boolean shouldWaitForRetired, boolean forceGetToBlockOnRetire, PassthroughMonitor monitor) {
        if (this.state == State.INIT) {
            throw new IllegalStateException("Connection is not in " + (Object)((Object)State.RUNNING) + " state");
        }
        if (this.state == State.CLOSED) {
            throw new ConnectionClosedException("Connection already closed");
        }
        PassthroughWait waiter = this.connectionState.sendNormal(this, message, shouldWaitForSent, shouldWaitForReceived, shouldWaitForCompleted, shouldWaitForRetired, forceGetToBlockOnRetire, monitor);
        if (Thread.currentThread() == this.clientThread) {
            while (!waiter.isDone() && this.handleNextMessage()) {
            }
        } else {
            waiter.waitForAck();
        }
        return waiter;
    }

    public <T, U> T createEntityInstance(Class<T> cls, String name, long clientInstanceID, long clientSideVersion, byte[] config, U userData) {
        EntityClientService<?, ?, ? extends EntityMessage, ? extends EntityResponse, ?> service = this.getEntityClientService(cls);
        return (T)this.storeNewEndpointAndCreateInstance(cls, name, clientInstanceID, config, service, userData);
    }

    private <M extends EntityMessage, R extends EntityResponse, U> Entity storeNewEndpointAndCreateInstance(Class<?> cls, String name, final long clientInstanceID, byte[] config, EntityClientService<?, ?, M, R, U> service, U userData) {
        Runnable onClose = new Runnable(){

            @Override
            public void run() {
                PassthroughConnection.this.localEndpoints.remove(clientInstanceID);
            }
        };
        PassthroughEntityClientEndpoint endpoint = new PassthroughEntityClientEndpoint(this, cls, name, clientInstanceID, config, service.getMessageCodec(), onClose);
        this.localEndpoints.put(clientInstanceID, endpoint);
        return this.endpointConnector.connect(endpoint, service, userData);
    }

    public synchronized void sendMessageToClient(PassthroughServerProcess sender, byte[] payload) {
        if (this.connectionState.isConnected(sender)) {
            ServerToClientMessageRecord record = new ServerToClientMessageRecord(sender, payload);
            this.messageQueue.add(record);
            this.notifyAll();
        }
    }

    private void runClientThread() {
        while (this.handleNextMessage()) {
        }
    }

    private boolean handleNextMessage() {
        ServerToClientMessageRecord message = this.getNextClientMessage();
        if (message != null) {
            if (this.connectionState.isConnected(message.sender)) {
                this.clientThreadHandleMessage(message.sender, message.payload);
            }
            return true;
        }
        return false;
    }

    private synchronized ServerToClientMessageRecord getNextClientMessage() {
        while (this.state == State.RUNNING) {
            if (!this.messageQueue.isEmpty()) {
                return this.messageQueue.remove(0);
            }
            try {
                this.wait();
            }
            catch (InterruptedException e) {
                Assert.unexpected(e);
            }
        }
        return null;
    }

    private void clientThreadHandleMessage(final PassthroughServerProcess sender, byte[] message) {
        PassthroughMessageCodec.Decoder<Void> decoder = new PassthroughMessageCodec.Decoder<Void>(){

            @Override
            public Void decode(PassthroughMessage.Type type, boolean shouldReplicate, long transactionID, long oldestTransactionID, DataInputStream input) throws IOException {
                switch (type) {
                    case ACK_FROM_SERVER: {
                        PassthroughConnection.this.handleAck(sender, transactionID);
                        break;
                    }
                    case MONITOR_MESSAGE: 
                    case MONITOR_EXCEPTION: 
                    case COMPLETE_FROM_SERVER: 
                    case EXCEPTION_FROM_SERVER: {
                        boolean isSuccess = type != PassthroughMessage.Type.EXCEPTION_FROM_SERVER && type != PassthroughMessage.Type.MONITOR_EXCEPTION;
                        int length = input.readInt();
                        byte[] bytes = null;
                        if (-1 != length) {
                            bytes = new byte[length];
                            input.readFully(bytes);
                        }
                        byte[] result = null;
                        EntityException error = null;
                        if (isSuccess) {
                            result = bytes;
                        } else {
                            error = PassthroughMessageCodec.deserializeExceptionFromArray(bytes);
                        }
                        if (type == PassthroughMessage.Type.MONITOR_MESSAGE) {
                            PassthroughConnection.this.handleMonitor(sender, transactionID, result);
                            break;
                        }
                        PassthroughConnection.this.handleComplete(sender, transactionID, result, error);
                        break;
                    }
                    case RETIRE_FROM_SERVER: {
                        PassthroughConnection.this.handleRetire(sender, transactionID);
                        break;
                    }
                    case INVOKE_ON_CLIENT: {
                        long clientInstanceID = input.readLong();
                        int length = input.readInt();
                        byte[] result = new byte[length];
                        input.readFully(result);
                        try {
                            PassthroughConnection.this.handleInvokeOnClient(clientInstanceID, result);
                        }
                        catch (MessageCodecException e) {
                            Assert.unexpected(e);
                        }
                        ((Waiter)PassthroughConnection.this.clientResponseWaitQueue.remove(0)).finish();
                        break;
                    }
                    case CREATE_ENTITY: 
                    case DESTROY_ENTITY: 
                    case DOES_ENTITY_EXIST: 
                    case FETCH_ENTITY: 
                    case RELEASE_ENTITY: 
                    case INVOKE_ON_SERVER: 
                    case RECONNECT: 
                    case UNEXPECTED_RELEASE: {
                        Assert.unreachable();
                        break;
                    }
                    default: {
                        Assert.unreachable();
                    }
                }
                return null;
            }
        };
        PassthroughMessageCodec.decodeRawMessage(decoder, message);
    }

    private void handleAck(PassthroughServerProcess sender, long transactionID) {
        PassthroughWait waiter = this.connectionState.getWaiterForTransaction(sender, transactionID);
        if (null != waiter) {
            waiter.handleAck();
        }
    }

    private void handleComplete(PassthroughServerProcess sender, long transactionID, byte[] result, EntityException error) {
        PassthroughWait waiter = this.connectionState.getWaiterForTransaction(sender, transactionID);
        if (null != waiter) {
            waiter.handleComplete(result, error);
        }
    }

    private void handleMonitor(PassthroughServerProcess sender, long transactionID, byte[] result) {
        PassthroughWait waiter = this.connectionState.getWaiterForTransaction(sender, transactionID);
        if (null != waiter) {
            waiter.handleMonitor(result);
        }
    }

    private void handleRetire(PassthroughServerProcess sender, long transactionID) {
        PassthroughWait waiter = this.connectionState.removeWaiterForTransaction(sender, transactionID);
        if (null != waiter) {
            waiter.handleRetire();
        }
    }

    private void handleInvokeOnClient(long clientInstanceID, byte[] result) throws MessageCodecException {
        this.localEndpoints.get(clientInstanceID).handleMessageFromServer(result);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        if (this.state == State.INIT) {
            throw new IllegalStateException("Connection is not in " + (Object)((Object)State.RUNNING) + " state");
        }
        if (this.state == State.RUNNING) {
            try {
                for (PassthroughEntityClientEndpoint<?, ?> endpoint : this.localEndpoints.values()) {
                    PassthroughMessage releaseMessage = endpoint.createUnexpectedReleaseMessage();
                    this.sendUnexpectedCloseMessage(releaseMessage);
                }
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            this.connectionState.forceClose();
            Iterator<PassthroughEntityClientEndpoint<?, ?>> iterator = this;
            synchronized (iterator) {
                this.state = State.CLOSED;
                this.notifyAll();
            }
            try {
                this.clientThread.join();
            }
            catch (InterruptedException e) {
                Assert.unexpected(e);
            }
            this.onClose.run();
            for (PassthroughEntityClientEndpoint<?, ?> endpoint : this.localEndpoints.values()) {
                endpoint.didCloseUnexpectedly();
            }
        } else {
            throw new IllegalStateException("Connection already closed");
        }
        this.localEndpoints.clear();
    }

    private void sendUnexpectedCloseMessage(PassthroughMessage message) {
        boolean shouldWaitForSent = false;
        boolean shouldWaitForReceived = false;
        boolean shouldWaitForCompleted = false;
        boolean shouldWaitForRetired = false;
        boolean forceGetToBlockOnRetire = false;
        PassthroughWait received = this.invokeAndWait(message, shouldWaitForSent, shouldWaitForReceived, shouldWaitForCompleted, shouldWaitForRetired, forceGetToBlockOnRetire, null);
        try {
            received.get();
        }
        catch (InterruptedException e) {
            Assert.unexpected(e);
        }
        catch (ExecutionException e) {
            Assert.unexpected(e);
        }
    }

    public EntityRef<?, ?, ?> getEntityRef(String type, long version, String name) {
        Class<?> clazz = this.loadEntityType(type);
        EntityClientService<?, ?, ? extends EntityMessage, ? extends EntityResponse, ?> service = clazz != null ? this.getEntityClientService(clazz) : null;
        return new PassthroughEntityRef(this, service, type, version, name);
    }

    public <T extends Entity, C, U> EntityRef<T, C, U> getEntityRef(Class<T> cls, long version, String name) {
        EntityClientService<?, ?, ? extends EntityMessage, ? extends EntityResponse, ?> service = this.getEntityClientService(cls);
        return new PassthroughEntityRef(this, service, cls.getCanonicalName(), version, name);
    }

    private EntityClientService<?, ?, ? extends EntityMessage, ? extends EntityResponse, ?> getEntityClientService(Class type) {
        EntityClientService<?, ?, ? extends EntityMessage, ? extends EntityResponse, ?> selected = null;
        for (EntityClientService<?, ?, ? extends EntityMessage, ? extends EntityResponse, ?> entityClientService : this.entityClientServices) {
            if (!entityClientService.handlesEntityType(type)) continue;
            Assert.assertTrue(null == selected);
            selected = entityClientService;
        }
        return selected;
    }

    private Class<?> loadEntityType(String typeName) {
        try {
            return Class.forName(typeName);
        }
        catch (ClassNotFoundException notfound) {
            return null;
        }
    }

    public synchronized long getNewInstanceID() {
        long thisClientEndpointID = this.nextClientEndpointID++;
        return thisClientEndpointID;
    }

    public synchronized Future<Void> createClientResponseFuture() {
        Waiter waiter = new Waiter();
        this.clientResponseWaitQueue.add(waiter);
        return waiter;
    }

    public void startReconnect(PassthroughServerProcess serverProcess) {
        Assert.assertTrue(null == this.waitersToResend);
        this.waitersToResend = new HashMap<Long, PassthroughWait>(this.connectionState.enterReconnectState(serverProcess));
        for (PassthroughEntityClientEndpoint<?, ?> endpoint : this.localEndpoints.values()) {
            byte[] extendedData = endpoint.getExtendedReconnectData();
            PassthroughMessage message = endpoint.buildReconnectMessage(extendedData);
            boolean shouldWaitForSent = true;
            boolean shouldWaitForReceived = true;
            boolean shouldWaitForCompleted = true;
            boolean shouldWaitForRetired = true;
            boolean forceGetToBlockOnRetire = true;
            PassthroughWait waiter = this.connectionState.sendAsReconnect(this, message, shouldWaitForSent, shouldWaitForReceived, shouldWaitForCompleted, shouldWaitForRetired, forceGetToBlockOnRetire);
            waiter.waitForAck();
        }
    }

    public void finishReconnect() {
        Assert.assertTrue(null != this.waitersToResend);
        for (Map.Entry<Long, PassthroughWait> entry : this.waitersToResend.entrySet()) {
            long transactionID = entry.getKey();
            PassthroughWait waiter = entry.getValue();
            this.connectionState.sendAsResend(this, transactionID, waiter);
        }
        this.connectionState.finishReconnectState();
        this.waitersToResend = null;
    }

    public void disconnect() {
        this.connectionState.enterDisconnectedState();
    }

    private static enum State {
        INIT,
        RUNNING,
        CLOSED;

    }

    private static class ServerToClientMessageRecord {
        public final PassthroughServerProcess sender;
        public final byte[] payload;

        public ServerToClientMessageRecord(PassthroughServerProcess sender, byte[] payload) {
            this.sender = sender;
            this.payload = payload;
        }
    }

    private static class Waiter
    implements Future<Void> {
        private boolean isDone = false;

        private Waiter() {
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            Assert.unreachable();
            return false;
        }

        @Override
        public synchronized Void get() throws InterruptedException, ExecutionException {
            while (!this.isDone) {
                this.wait();
            }
            return null;
        }

        @Override
        public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            Assert.unreachable();
            return null;
        }

        @Override
        public boolean isCancelled() {
            Assert.unreachable();
            return false;
        }

        @Override
        public synchronized boolean isDone() {
            return this.isDone;
        }

        public synchronized void finish() {
            this.isDone = true;
            this.notifyAll();
        }
    }
}

