/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.spi.impl;

import com.hazelcast.client.HazelcastClientNotActiveException;
import com.hazelcast.client.connection.ClientConnectionManager;
import com.hazelcast.client.connection.nio.ClientConnection;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.ClientMessageType;
import com.hazelcast.client.impl.protocol.parameters.ExceptionResultParameters;
import com.hazelcast.client.impl.protocol.parameters.RemoveAllListenersParameters;
import com.hazelcast.client.spi.ClientExecutionService;
import com.hazelcast.client.spi.ClientInvocationService;
import com.hazelcast.client.spi.ClientPartitionService;
import com.hazelcast.client.spi.EventHandler;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ConnectionHeartbeatListener;
import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.ConnectionListener;
import com.hazelcast.nio.SocketWritable;
import com.hazelcast.spi.exception.TargetDisconnectedException;
import com.hazelcast.util.ConstructorFunction;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

abstract class ClientInvocationServiceSupport
implements ClientInvocationService,
ConnectionHeartbeatListener,
ConnectionListener {
    private static final int WAIT_TIME_FOR_PACKETS_TO_BE_CONSUMED = 10;
    private static final int WAIT_TIME_FOR_PACKETS_TO_BE_CONSUMED_THRESHOLD = 5000;
    protected final HazelcastClientInstanceImpl client;
    protected final ClientConnectionManager connectionManager;
    protected final ClientPartitionService partitionService;
    protected final ClientExecutionService executionService;
    private final ILogger logger = Logger.getLogger(ClientInvocationService.class);
    private final ResponseThread responseThread;
    private final ConcurrentMap<Integer, ClientInvocation> callIdMap = new ConcurrentHashMap<Integer, ClientInvocation>();
    private final ConcurrentMap<Integer, ClientInvocation> eventHandlerMap = new ConcurrentHashMap<Integer, ClientInvocation>();
    private final AtomicInteger callIdIncrementer = new AtomicInteger();
    private volatile boolean isShutdown;

    public ClientInvocationServiceSupport(HazelcastClientInstanceImpl client) {
        this.client = client;
        this.connectionManager = client.getConnectionManager();
        this.executionService = client.getClientExecutionService();
        this.connectionManager.addConnectionListener(this);
        this.connectionManager.addConnectionHeartbeatListener(this);
        this.partitionService = client.getClientPartitionService();
        this.responseThread = new ResponseThread(client.getThreadGroup(), client.getName() + ".response-", client.getClientConfig().getClassLoader());
        this.responseThread.start();
    }

    @Override
    public boolean isRedoOperation() {
        return this.client.getClientConfig().getNetworkConfig().isRedoOperation();
    }

    protected void send(ClientInvocation invocation, ClientConnection connection) throws IOException {
        if (this.isShutdown) {
            throw new HazelcastClientNotActiveException("Client is shut down");
        }
        this.registerInvocation(invocation);
        ClientMessage clientMessage = invocation.getClientMessage();
        if (!this.isAllowedToSendRequest(connection, invocation) || !this.writeToConnection(connection, clientMessage)) {
            int callId = clientMessage.getCorrelationId();
            this.deRegisterCallId(callId);
            this.deRegisterEventHandler(callId);
            throw new IOException("Packet not send to " + connection.getRemoteEndpoint());
        }
        invocation.setSendConnection(connection);
    }

    private boolean writeToConnection(ClientConnection connection, ClientMessage clientMessage) {
        clientMessage.addFlag((short)192);
        return connection.write((SocketWritable)clientMessage);
    }

    private boolean isAllowedToSendRequest(ClientConnection connection, ClientInvocation invocation) {
        if (!connection.isHeartBeating()) {
            if (invocation.shouldBypassHeartbeatCheck()) {
                return true;
            }
            if (this.logger.isFinestEnabled()) {
                this.logger.warning("Connection is not heart-beating, won't write client message -> " + invocation.getClientMessage());
            }
            return false;
        }
        return true;
    }

    private void registerInvocation(ClientInvocation clientInvocation) {
        short protocolVersion = this.client.getProtocolVersion();
        int correlationId = this.newCorrelationId();
        clientInvocation.getClientMessage().setCorrelationId(correlationId).setVersion(protocolVersion);
        this.callIdMap.put(correlationId, clientInvocation);
        if (clientInvocation.getHandler() != null) {
            this.eventHandlerMap.put(correlationId, clientInvocation);
        }
    }

    private ClientInvocation deRegisterCallId(int callId) {
        return (ClientInvocation)this.callIdMap.remove(callId);
    }

    private ClientInvocation deRegisterEventHandler(int callId) {
        return (ClientInvocation)this.eventHandlerMap.remove(callId);
    }

    @Override
    public EventHandler getEventHandler(int callId) {
        ClientInvocation clientInvocation = (ClientInvocation)this.eventHandlerMap.get(callId);
        if (clientInvocation == null) {
            return null;
        }
        return clientInvocation.getHandler();
    }

    @Override
    public boolean removeEventHandler(Integer callId) {
        if (callId != null) {
            return this.eventHandlerMap.remove(callId) != null;
        }
        return false;
    }

    public void cleanResources(ConstructorFunction<Object, Throwable> responseCtor, ClientConnection connection) {
        ClientInvocation invocation;
        Iterator iter = this.callIdMap.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry entry = iter.next();
            invocation = (ClientInvocation)entry.getValue();
            if (!invocation.getSendConnection().equals(connection)) continue;
            iter.remove();
            invocation.notifyException((Throwable)responseCtor.createNew(null));
            this.eventHandlerMap.remove(entry.getKey());
        }
        Iterator iterator = this.eventHandlerMap.values().iterator();
        while (iterator.hasNext()) {
            invocation = (ClientInvocation)iterator.next();
            if (!invocation.getSendConnection().equals(connection)) continue;
            iterator.remove();
            invocation.notifyException((Throwable)responseCtor.createNew(null));
        }
    }

    @Override
    public void heartBeatStarted(Connection connection) {
    }

    @Override
    public void heartBeatStopped(Connection connection) {
        ClientMessage request = RemoveAllListenersParameters.encode();
        ClientInvocation removeListenerInvocation = new ClientInvocation(this.client, request, connection);
        removeListenerInvocation.setBypassHeartbeatCheck(true);
        removeListenerInvocation.invoke();
        Address remoteEndpoint = connection.getEndPoint();
        Iterator iterator = this.eventHandlerMap.values().iterator();
        TargetDisconnectedException response = new TargetDisconnectedException(remoteEndpoint);
        while (iterator.hasNext()) {
            ClientInvocation clientInvocation = (ClientInvocation)iterator.next();
            if (!clientInvocation.getSendConnection().equals(connection)) continue;
            iterator.remove();
            clientInvocation.notifyException((Throwable)response);
        }
    }

    public void connectionAdded(Connection connection) {
    }

    public void connectionRemoved(Connection connection) {
        this.cleanConnectionResources((ClientConnection)connection);
    }

    @Override
    public void cleanConnectionResources(ClientConnection connection) {
        if (this.connectionManager.isAlive()) {
            try {
                this.executionService.execute(new CleanResourcesTask(connection));
            }
            catch (RejectedExecutionException e) {
                this.logger.warning("Execution rejected ", (Throwable)e);
            }
        } else {
            this.cleanResources(new ConstructorFunction<Object, Throwable>(){

                public Throwable createNew(Object arg) {
                    return new HazelcastClientNotActiveException("Client is shutting down!");
                }
            }, connection);
        }
    }

    @Override
    public void shutdown() {
        this.isShutdown = true;
        this.responseThread.interrupt();
    }

    @Override
    public void handleClientMessage(ClientMessage message, Connection connection) {
        this.responseThread.workQueue.add(new ClientPacket((ClientConnection)connection, message));
    }

    private int newCorrelationId() {
        return this.callIdIncrementer.incrementAndGet();
    }

    private class ResponseThread
    extends Thread {
        private final BlockingQueue<ClientPacket> workQueue;

        public ResponseThread(ThreadGroup threadGroup, String name, ClassLoader classLoader) {
            super(threadGroup, name);
            this.workQueue = new LinkedBlockingQueue<ClientPacket>();
            this.setContextClassLoader(classLoader);
        }

        @Override
        public void run() {
            try {
                this.doRun();
            }
            catch (OutOfMemoryError e) {
                OutOfMemoryErrorDispatcher.onOutOfMemory((OutOfMemoryError)e);
            }
            catch (Throwable t) {
                ClientInvocationServiceSupport.this.logger.severe(t);
            }
        }

        private void doRun() {
            while (true) {
                ClientPacket task;
                try {
                    task = this.workQueue.take();
                }
                catch (InterruptedException e) {
                    if (!ClientInvocationServiceSupport.this.isShutdown) continue;
                    return;
                }
                if (ClientInvocationServiceSupport.this.isShutdown) {
                    return;
                }
                this.process(task);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void process(ClientPacket packet) {
            ClientConnection conn = packet.getClientConnection();
            try {
                this.handleClientMessage(packet.getClientMessage());
            }
            catch (Exception e) {
                ClientInvocationServiceSupport.this.logger.severe("Failed to process task: " + packet + " on responseThread :" + this.getName(), (Throwable)e);
            }
            finally {
                conn.decrementPacketCount();
            }
        }

        private void handleClientMessage(ClientMessage clientMessage) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
            int correlationId = clientMessage.getCorrelationId();
            ClientInvocation future = ClientInvocationServiceSupport.this.deRegisterCallId(correlationId);
            if (future == null) {
                ClientInvocationServiceSupport.this.logger.warning("No call for callId: " + correlationId + ", response: " + clientMessage);
                return;
            }
            if (ClientMessageType.EXCEPTION.id() == clientMessage.getMessageType()) {
                Throwable exception;
                boolean hasCause;
                ExceptionResultParameters exceptionResultParameters = ExceptionResultParameters.decode((ClientMessage)clientMessage);
                boolean bl = hasCause = !exceptionResultParameters.causeClassName.equals("null");
                if (hasCause) {
                    Class<?> causeClazz = Class.forName(exceptionResultParameters.causeClassName);
                    Constructor<?> causeConstructor = causeClazz.getDeclaredConstructor(String.class);
                    causeConstructor.setAccessible(true);
                    Throwable cause = (Throwable)causeConstructor.newInstance(exceptionResultParameters.message);
                    Class<?> clazz = Class.forName(exceptionResultParameters.className);
                    Constructor<?> constructor = clazz.getDeclaredConstructor(String.class, Throwable.class);
                    constructor.setAccessible(true);
                    exception = (Throwable)constructor.newInstance(exceptionResultParameters.message, cause);
                } else {
                    Class<?> clazz = Class.forName(exceptionResultParameters.className);
                    Constructor<?> constructor = clazz.getDeclaredConstructor(String.class);
                    constructor.setAccessible(true);
                    exception = (Throwable)constructor.newInstance(exceptionResultParameters.message);
                }
                future.notifyException(exception);
            } else {
                future.notify(clientMessage);
            }
        }
    }

    private static class ClientPacket {
        private final ClientConnection clientConnection;
        private final ClientMessage clientMessage;

        public ClientPacket(ClientConnection clientConnection, ClientMessage clientMessage) {
            this.clientConnection = clientConnection;
            this.clientMessage = clientMessage;
        }

        public ClientConnection getClientConnection() {
            return this.clientConnection;
        }

        public ClientMessage getClientMessage() {
            return this.clientMessage;
        }
    }

    private class CleanResourcesTask
    implements Runnable {
        private final ClientConnection connection;

        CleanResourcesTask(ClientConnection connection) {
            this.connection = connection;
        }

        @Override
        public void run() {
            this.waitForPacketsProcessed();
            ClientInvocationServiceSupport.this.cleanResources(new ConstructorFunction<Object, Throwable>(){

                public Throwable createNew(Object arg) {
                    return new TargetDisconnectedException(CleanResourcesTask.this.connection.getRemoteEndpoint());
                }
            }, this.connection);
        }

        private void waitForPacketsProcessed() {
            long begin = System.currentTimeMillis();
            int count = this.connection.getPacketCount();
            while (count != 0) {
                try {
                    Thread.sleep(10L);
                }
                catch (InterruptedException e) {
                    ClientInvocationServiceSupport.this.logger.warning((Throwable)e);
                    break;
                }
                long elapsed = System.currentTimeMillis() - begin;
                if (elapsed > 5000L) {
                    ClientInvocationServiceSupport.this.logger.warning("There are packets which are not processed " + count);
                    break;
                }
                count = this.connection.getPacketCount();
            }
        }
    }
}

