/*
 * Decompiled with CFR 0.152.
 */
package com.tc.object;

import com.tc.bytes.TCByteBufferFactory;
import com.tc.entity.NetworkVoltronEntityMessage;
import com.tc.entity.ResendVoltronEntityMessage;
import com.tc.entity.VoltronEntityMessage;
import com.tc.entity.VoltronEntityMultiResponse;
import com.tc.entity.VoltronEntityResponse;
import com.tc.exception.EntityBusyException;
import com.tc.exception.EntityReferencedException;
import com.tc.exception.WrappedEntityException;
import com.tc.logging.ClientIDLogger;
import com.tc.net.ClientID;
import com.tc.net.NodeID;
import com.tc.net.ServerID;
import com.tc.net.protocol.tcm.ClientMessageChannel;
import com.tc.net.protocol.tcm.MessageChannel;
import com.tc.net.protocol.tcm.NetworkRecall;
import com.tc.net.protocol.tcm.TCMessageType;
import com.tc.net.protocol.tcm.UnknownNameException;
import com.tc.object.ClientEntityManager;
import com.tc.object.ClientEntityStateManager;
import com.tc.object.ClientInstanceID;
import com.tc.object.EntityClientEndpointImpl;
import com.tc.object.EntityDescriptor;
import com.tc.object.EntityID;
import com.tc.object.FetchID;
import com.tc.object.InFlightMessage;
import com.tc.object.SafeInvocationCallback;
import com.tc.object.TransactionSource;
import com.tc.object.msg.ClientEntityReferenceContext;
import com.tc.object.msg.ClientHandshakeMessage;
import com.tc.object.session.SessionID;
import com.tc.object.tx.TransactionID;
import com.tc.text.MapListPrettyPrint;
import com.tc.text.PrettyPrintable;
import com.tc.util.Assert;
import com.tc.util.Throwables;
import com.tc.util.Util;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.entity.EntityClientEndpoint;
import org.terracotta.entity.EntityMessage;
import org.terracotta.entity.EntityResponse;
import org.terracotta.entity.Invocation;
import org.terracotta.entity.InvocationCallback;
import org.terracotta.entity.MessageCodec;
import org.terracotta.entity.MessageCodecException;
import org.terracotta.exception.ConnectionClosedException;
import org.terracotta.exception.EntityException;
import org.terracotta.exception.EntityNotFoundException;
import org.terracotta.exception.EntityServerUncaughtException;

public class ClientEntityManagerImpl
implements ClientEntityManager {
    private final Logger logger;
    private final ClientMessageChannel channel;
    private final ConcurrentMap<TransactionID, InFlightMessage> inFlightMessages;
    private final TransactionSource transactionSource;
    private final ClientEntityStateManager stateManager;
    private final ConcurrentMap<ClientInstanceID, EntityClientEndpointImpl<?, ?>> objectStoreMap;
    private final ExecutorService endpointCloser = Executors.newWorkStealingPool();
    private final LongAdder msgCount = new LongAdder();
    private final LongAdder inflights = new LongAdder();
    private final LongAdder addWindow = new LongAdder();

    public ClientEntityManagerImpl(ClientMessageChannel channel) {
        this.channel = channel;
        this.logger = new ClientIDLogger(() -> channel.getClientID(), LoggerFactory.getLogger(ClientEntityManager.class));
        this.inFlightMessages = new ConcurrentHashMap<TransactionID, InFlightMessage>();
        this.transactionSource = new TransactionSource();
        this.stateManager = new ClientEntityStateManager();
        this.objectStoreMap = new ConcurrentHashMap(10240, 0.75f, 128);
    }

    @Override
    public synchronized boolean isValid() {
        return !this.stateManager.isShutdown() && this.channel.isOpen();
    }

    private boolean enqueueMessage(InFlightMessage msg) throws RejectedExecutionException {
        if (this.stateManager.isShutdown()) {
            return false;
        }
        this.inFlightMessages.put(msg.getTransactionID(), msg);
        return true;
    }

    @Override
    public EntityClientEndpoint fetchEntity(EntityID entity, long version, ClientInstanceID instance, MessageCodec<? extends EntityMessage, ? extends EntityResponse> codec) throws EntityException {
        return this.internalLookup(entity, version, instance, codec);
    }

    @Override
    public void handleMessage(TransactionID tid, byte[] message) {
        InFlightMessage msg = (InFlightMessage)this.inFlightMessages.get(tid);
        if (msg != null) {
            msg.handleMessage(message);
        } else {
            this.logger.info("transaction " + tid + " not found. Ignoring message.");
        }
    }

    @Override
    public void handleStatistics(TransactionID tid, long[] message) {
        InFlightMessage msg = (InFlightMessage)this.inFlightMessages.get(tid);
        if (msg != null) {
            msg.addServerStatistics(message);
        } else {
            this.addWindow.add(message[0]);
            if (message[0] > TimeUnit.MILLISECONDS.toNanos(500L)) {
                this.logger.debug("add window " + message[0]);
            }
        }
    }

    @Override
    public void handleMessage(ClientInstanceID clientInstance, byte[] message) {
        EntityClientEndpoint endpoint = (EntityClientEndpoint)this.objectStoreMap.get(clientInstance);
        if (endpoint != null) {
            this.deliverInboundMessage(endpoint, message);
        } else {
            this.logger.info("Instance " + clientInstance + " not found. Ignoring message.");
        }
    }

    private void deliverInboundMessage(EntityClientEndpoint endpoint, byte[] msg) {
        EntityClientEndpointImpl endpointImpl = (EntityClientEndpointImpl)endpoint;
        try {
            endpointImpl.handleMessage(msg);
        }
        catch (MessageCodecException e) {
            Assert.fail(e.getLocalizedMessage());
        }
    }

    @Override
    public byte[] createEntity(EntityID entityID, long version, byte[] config) throws EntityException {
        return this.lifecycleAndRetire(entityID, version, VoltronEntityMessage.Type.CREATE_ENTITY, config);
    }

    @Override
    public byte[] reconfigureEntity(EntityID entityID, long version, byte[] config) throws EntityException {
        return this.lifecycleAndRetire(entityID, version, VoltronEntityMessage.Type.RECONFIGURE_ENTITY, config);
    }

    private byte[] lifecycleAndComplete(EntityID entityId, EntityDescriptor entityDescriptor, VoltronEntityMessage.Type type) throws EntityException {
        return this.retryingWhileBusy(entityId, () -> Invocation.uninterruptiblyGet(this.lifecycle(entityId, entityDescriptor, type, new byte[0]).invoke(), EntityException.class));
    }

    private byte[] lifecycleAndRetire(EntityID entityId, long version, VoltronEntityMessage.Type type, byte[] message) throws EntityException {
        return this.retryingWhileBusy(entityId, () -> {
            Invocation<byte[]> builder = this.lifecycle(entityId, EntityDescriptor.createDescriptorForLifecycle(entityId, version), type, message);
            return Invocation.uninterruptiblyGet(builder.invokeAndRetire(), EntityException.class);
        });
    }

    private Invocation<byte[]> lifecycle(EntityID entityID, EntityDescriptor entityDescriptor, VoltronEntityMessage.Type type, byte[] message) {
        return (callback, callbacks) -> this.invoke(entityID, entityDescriptor, callbacks, SafeInvocationCallback.safe(callback), true, type, message);
    }

    @Override
    public boolean destroyEntity(EntityID entityID, long version) throws EntityException {
        try {
            this.lifecycleAndRetire(entityID, version, VoltronEntityMessage.Type.DESTROY_ENTITY, new byte[0]);
            return true;
        }
        catch (EntityReferencedException r) {
            return false;
        }
    }

    private Set<VoltronEntityMessage.Acks> makeServerAcks(Set<InvocationCallback.Types> requestedCallbacks) {
        return requestedCallbacks.stream().map(callback -> {
            switch (callback) {
                case SENT: {
                    return VoltronEntityMessage.Acks.SENT;
                }
                case RECEIVED: {
                    return VoltronEntityMessage.Acks.RECEIVED;
                }
                case COMPLETE: {
                    return VoltronEntityMessage.Acks.COMPLETED;
                }
                case RETIRED: {
                    return VoltronEntityMessage.Acks.RETIRED;
                }
            }
            return null;
        }).filter(Objects::nonNull).collect(Collectors.toCollection(() -> EnumSet.noneOf(VoltronEntityMessage.Acks.class)));
    }

    @Override
    public Invocation.Task invokeAction(EntityID eid, EntityDescriptor entityDescriptor, Set<InvocationCallback.Types> requestedCallbacks, SafeInvocationCallback<byte[]> callback, boolean requiresReplication, byte[] payload) {
        return this.invoke(eid, entityDescriptor, requestedCallbacks, callback, requiresReplication, VoltronEntityMessage.Type.INVOKE_ACTION, payload);
    }

    private Invocation.Task invoke(EntityID eid, EntityDescriptor entityDescriptor, Set<InvocationCallback.Types> requestedCallbacks, SafeInvocationCallback<byte[]> callback, boolean requiresReplication, VoltronEntityMessage.Type type, byte[] payload) {
        Set<VoltronEntityMessage.Acks> requestedAcks = this.makeServerAcks(requestedCallbacks);
        return this.queueInFlightMessage(eid, () -> this.createMessageWithDescriptor(eid, entityDescriptor, requiresReplication, payload, type, requestedAcks), callback);
    }

    @Override
    public Map<String, ?> getStateMap() {
        LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
        for (EntityClientEndpointImpl s : this.objectStoreMap.values()) {
            map.put(s.getEntityID().toString(), s.getStatistics().getStateMap());
        }
        map.put("messagesOut", this.msgCount.sum());
        if (this.msgCount.sum() > 0L) {
            map.put("averagePending", this.inflights.sum() / this.msgCount.sum());
            map.put("averageServerWindow", this.addWindow.sum() / this.msgCount.sum());
        }
        Object stats = this.channel.getAttachment("ChannelStats");
        LinkedHashMap<String, Object> sub = new LinkedHashMap<String, Object>();
        sub.put("connection", this.channel.getConnectionID());
        sub.put("local", this.channel.getLocalAddress());
        sub.put("remote", this.channel.getRemoteAddress());
        sub.put("product", (Object)this.channel.getProductID());
        sub.put("client", this.channel.getClientID());
        if (this.stateManager.isShutdown()) {
            sub.put("pendingMessages", "<shutdown>");
        } else {
            sub.put("pendingMessages", this.inFlightMessages.size());
        }
        map.put("channel", sub);
        if (stats instanceof PrettyPrintable) {
            sub.put("stats", ((PrettyPrintable)stats).getStateMap());
        }
        return map;
    }

    @Override
    public void received(TransactionID id) {
        InFlightMessage inFlight = (InFlightMessage)this.inFlightMessages.get(id);
        if (inFlight != null) {
            inFlight.received();
        }
    }

    @Override
    public void complete(TransactionID id) {
        this.complete(id, null);
    }

    @Override
    public void complete(TransactionID id, byte[] value) {
        InFlightMessage inFlight = (InFlightMessage)this.inFlightMessages.get(id);
        if (inFlight != null) {
            inFlight.setResult(value, null);
        }
    }

    @Override
    public void failed(TransactionID id, Exception error) {
        InFlightMessage inFlight = (InFlightMessage)this.inFlightMessages.get(id);
        if (inFlight != null) {
            inFlight.setResult(null, error);
        }
    }

    @Override
    public void retired(TransactionID id) {
        try {
            InFlightMessage inFlight = (InFlightMessage)this.inFlightMessages.remove(id);
            if (inFlight != null) {
                inFlight.retired();
            }
        }
        finally {
            this.transactionSource.retire(id);
        }
    }

    @Override
    public synchronized void pause() {
        this.stateManager.pause();
    }

    @Override
    public synchronized void unpause() {
        this.stateManager.running();
        this.notifyAll();
    }

    @Override
    public synchronized void initializeHandshake(ClientHandshakeMessage handshakeMessage) {
        for (EntityClientEndpointImpl endpoint : this.objectStoreMap.values()) {
            EntityDescriptor descriptor = endpoint.getEntityDescriptor();
            EntityID entityID = endpoint.getEntityID();
            long entityVersion = endpoint.getVersion();
            byte[] extendedReconnectData = endpoint.getExtendedReconnectData();
            ClientEntityReferenceContext context = new ClientEntityReferenceContext(entityID, entityVersion, descriptor.getClientInstanceID(), extendedReconnectData);
            handshakeMessage.addReconnectReference(context);
        }
        for (InFlightMessage inFlight : this.inFlightMessages.values()) {
            if (!inFlight.commit()) continue;
            VoltronEntityMessage message = inFlight.getMessage();
            ResendVoltronEntityMessage packaged = new ResendVoltronEntityMessage(message.getSource(), message.getTransactionID(), message.getEntityDescriptor(), message.getVoltronType(), message.doesRequireReplication(), message.getExtendedData());
            handshakeMessage.addResendMessage(packaged);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void shutdown() {
        Iterator iterator = this;
        synchronized (iterator) {
            if (this.stateManager.isShutdown()) {
                return;
            }
            this.stateManager.stop();
            this.notifyAll();
        }
        for (InFlightMessage msg : this.inFlightMessages.values()) {
            this.throwClosedExceptionOnMessage(msg, "Connection closed under in-flight message");
        }
        for (EntityClientEndpointImpl endpoint : this.objectStoreMap.values()) {
            try {
                endpoint.didCloseUnexpectedly();
            }
            catch (Throwable t) {
                this.logger.error("error in shutdown", t);
            }
        }
        this.endpointCloser.shutdownNow();
        if (this.logger.isDebugEnabled()) {
            MapListPrettyPrint print = new MapListPrettyPrint();
            this.prettyPrint(print);
            this.logger.debug(print.toString());
        }
        this.objectStoreMap.clear();
    }

    private void throwClosedExceptionOnMessage(InFlightMessage msg, String description) {
        msg.received();
        ConnectionClosedException closed = new ConnectionClosedException(msg.getEntityID().getClassName(), msg.getEntityID().getEntityName(), description, false, null);
        msg.setResult(null, closed);
        msg.retired();
        this.inFlightMessages.remove(msg.getTransactionID());
        this.transactionSource.retire(msg.getTransactionID());
    }

    private <M extends EntityMessage, R extends EntityResponse> EntityClientEndpoint<M, R> internalLookup(final EntityID entity, long version, ClientInstanceID instance, MessageCodec<M, R> codec) throws EntityException {
        Assert.assertNotNull("Can't lookup null entity descriptor", instance);
        final EntityDescriptor fetchDescriptor = EntityDescriptor.createDescriptorForFetch(entity, version, instance);
        EntityClientEndpointImpl<M, R> resolvedEndpoint = null;
        try {
            byte[] raw = this.internalRetrieve(fetchDescriptor);
            ByteBuffer br = ByteBuffer.wrap(raw);
            long fetchID = br.getLong();
            FetchID fetch = new FetchID(fetchID);
            byte[] config = new byte[br.remaining()];
            br.get(config);
            Assert.assertTrue(null != raw);
            Runnable compoundRunnable = new Runnable(){

                @Override
                public void run() {
                    try {
                        ClientEntityManagerImpl.this.internalRelease(entity, fetchDescriptor);
                    }
                    catch (EntityException e) {
                        Util.printLogAndRethrowError(e, ClientEntityManagerImpl.this.logger);
                    }
                }
            };
            resolvedEndpoint = new EntityClientEndpointImpl<M, R>(entity, version, EntityDescriptor.createDescriptorForInvoke(fetch, instance), this, config, codec, compoundRunnable, this.endpointCloser);
            if (null != this.objectStoreMap.get(instance)) {
                throw Assert.failure("Attempt to add an object that already exists: Object of class " + resolvedEndpoint.getClass() + " [Identity Hashcode : 0x" + Integer.toHexString(System.identityHashCode(resolvedEndpoint)) + "] ");
            }
            this.objectStoreMap.put(instance, resolvedEndpoint);
        }
        catch (EntityNotFoundException notfound) {
            throw notfound;
        }
        catch (EntityException e) {
            this.internalRelease(entity, fetchDescriptor);
            throw e;
        }
        catch (Throwable t) {
            this.internalRelease(entity, fetchDescriptor);
            throw Throwables.propagate(t);
        }
        return resolvedEndpoint;
    }

    private void internalRelease(EntityID entityId, EntityDescriptor entityDescriptor) throws EntityException {
        this.lifecycleAndComplete(entityId, entityDescriptor, VoltronEntityMessage.Type.RELEASE_ENTITY);
        EntityClientEndpointImpl ref = (EntityClientEndpointImpl)this.objectStoreMap.remove(entityDescriptor.getClientInstanceID());
        if (ref != null && this.logger.isDebugEnabled()) {
            MapListPrettyPrint print = new MapListPrettyPrint();
            ref.getStatistics().prettyPrint(print);
            this.logger.debug("Releasing " + ref.getEntityID() + "=" + print.toString());
        }
    }

    private byte[] internalRetrieve(EntityDescriptor entityDescriptor) throws EntityException {
        return this.lifecycleAndComplete(entityDescriptor.getEntityID(), entityDescriptor, VoltronEntityMessage.Type.FETCH_ENTITY);
    }

    private Invocation.Task queueInFlightMessage(EntityID eid, Supplier<NetworkVoltronEntityMessage> message, SafeInvocationCallback<byte[]> callback) {
        try {
            boolean queued;
            InFlightMessage inFlight = new InFlightMessage(eid, message, callback);
            try {
                this.msgCount.increment();
                this.inflights.add(this.inFlightMessages.size());
                queued = this.enqueueMessage(inFlight);
            }
            catch (Throwable t) {
                this.transactionSource.retire(inFlight.getTransactionID());
                throw t;
            }
            if (queued && !this.stateManager.isShutdown()) {
                inFlight.sent();
                if (!inFlight.send()) {
                    this.logger.debug("message not sent.  Make sure resend happens " + inFlight);
                    if (!this.channel.getProductID().isReconnectEnabled()) {
                        this.throwClosedExceptionOnMessage(inFlight, "connection not capable of resend");
                    }
                }
            } else {
                this.throwClosedExceptionOnMessage(inFlight, "Connection closed before sending message");
            }
            return () -> {
                if (inFlight.cancel()) {
                    this.inFlightMessages.remove(inFlight.getTransactionID(), inFlight);
                    return true;
                }
                return false;
            };
        }
        catch (ConnectionClosedException e) {
            callback.sent();
            callback.failure(e);
            callback.complete();
            callback.retired();
            return () -> false;
        }
    }

    private NetworkVoltronEntityMessage createMessageWithoutClientInstance(EntityID entityID, long version, boolean requiresReplication, byte[] config, VoltronEntityMessage.Type type, Set<VoltronEntityMessage.Acks> acks) {
        EntityDescriptor entityDescriptor = EntityDescriptor.createDescriptorForLifecycle(entityID, version);
        return this.createMessageWithDescriptor(entityID, entityDescriptor, requiresReplication, config, type, acks);
    }

    private NetworkVoltronEntityMessage createMessageWithDescriptor(EntityID entityID, EntityDescriptor entityDescriptor, boolean requiresReplication, byte[] config, VoltronEntityMessage.Type type, Set<VoltronEntityMessage.Acks> acks) {
        NetworkVoltronEntityMessage message = (NetworkVoltronEntityMessage)this.channel.createMessage(TCMessageType.VOLTRON_ENTITY_MESSAGE);
        ClientID clientID = this.channel.getClientID();
        TransactionID transactionID = this.transactionSource.create();
        TransactionID oldestTransactionPending = this.transactionSource.oldest();
        message.setContents(clientID, transactionID, entityID, entityDescriptor, type, requiresReplication, TCByteBufferFactory.wrap(config), oldestTransactionPending, acks);
        return message;
    }

    private <T> T retryingWhileBusy(EntityID entityID, EntityLifecycleTask<T> task) throws EntityException {
        while (true) {
            try {
                return task.run();
            }
            catch (EntityBusyException busy) {
                this.logger.info("Cluster is busy. Requested operation will be retried in 2 seconds");
                try {
                    TimeUnit.SECONDS.sleep(2L);
                }
                catch (InterruptedException in) {
                    throw new WrappedEntityException(new EntityServerUncaughtException(entityID.getClassName(), entityID.getEntityName(), "", in));
                }
            }
        }
    }

    private static interface EntityLifecycleTask<T> {
        public T run() throws EntityException;
    }

    private static class FlushResponse
    implements VoltronEntityResponse,
    VoltronEntityMultiResponse {
        private boolean accessed = false;

        private FlushResponse() {
        }

        @Override
        public synchronized TransactionID getTransactionID() {
            this.notifyAll();
            this.accessed = true;
            return TransactionID.NULL_ID;
        }

        @Override
        public VoltronEntityMessage.Acks getAckType() {
            return VoltronEntityMessage.Acks.RECEIVED;
        }

        @Override
        public TCMessageType getMessageType() {
            return TCMessageType.VOLTRON_ENTITY_RECEIVED_RESPONSE;
        }

        @Override
        public void hydrate() throws IOException, UnknownNameException {
        }

        @Override
        public synchronized int replay(VoltronEntityMultiResponse.ReplayReceiver receiver) {
            this.notifyAll();
            this.accessed = true;
            return 0;
        }

        @Override
        public NetworkRecall send() {
            return null;
        }

        @Override
        public MessageChannel getChannel() {
            return null;
        }

        @Override
        public NodeID getSourceNodeID() {
            return ServerID.NULL_ID;
        }

        @Override
        public NodeID getDestinationNodeID() {
            return ClientID.NULL_ID;
        }

        @Override
        public SessionID getLocalSessionID() {
            return SessionID.NULL_ID;
        }

        @Override
        public int getMessageLength() {
            return 0;
        }

        @Override
        public boolean addReceived(TransactionID tid) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addRetired(TransactionID tid) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addResult(TransactionID tid, byte[] result) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addResultAndRetire(TransactionID tid, byte[] result) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addServerMessage(TransactionID cid, byte[] message) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addServerMessage(ClientInstanceID cid, byte[] message) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean addStats(TransactionID cid, long[] timings) {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public void stopAdding() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        @Override
        public boolean startAdding() {
            throw new UnsupportedOperationException("Not supported yet.");
        }

        public synchronized void waitForAccess() {
            boolean interrupted = Thread.interrupted();
            while (!this.accessed) {
                try {
                    this.wait();
                }
                catch (InterruptedException ie) {
                    interrupted = true;
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

