/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.spi.impl;

import com.hazelcast.client.connection.nio.ClientConnection;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.spi.impl.AbstractClientInvocationService;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ClientResponseHandler;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.internal.util.concurrent.MPSCQueue;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
import com.hazelcast.util.HashUtil;
import com.hazelcast.util.MutableInteger;
import com.hazelcast.util.function.Supplier;
import java.util.concurrent.BlockingQueue;

public class ClientResponseHandlerSupplier
implements Supplier<ClientResponseHandler> {
    private static final HazelcastProperty IDLE_STRATEGY = new HazelcastProperty("hazelcast.client.responsequeue.idlestrategy", "block");
    private static final ThreadLocal<MutableInteger> INT_HOLDER = new ThreadLocal<MutableInteger>(){

        @Override
        protected MutableInteger initialValue() {
            return new MutableInteger();
        }
    };
    private final AbstractClientInvocationService invocationService;
    private final ResponseThread[] responseThreads;
    private final HazelcastClientInstanceImpl client;
    private final ILogger logger;
    private final ClientResponseHandler responseHandler;

    public ClientResponseHandlerSupplier(AbstractClientInvocationService invocationService) {
        this.invocationService = invocationService;
        this.client = invocationService.client;
        this.logger = invocationService.invocationLogger;
        int responseThreadCount = this.client.getProperties().getInteger(ClientProperty.RESPONSE_THREAD_COUNT);
        if (responseThreadCount < 0) {
            throw new IllegalArgumentException(ClientProperty.RESPONSE_THREAD_COUNT.getName() + " can't be smaller than 0");
        }
        this.logger.info("Running with " + responseThreadCount + " response threads");
        this.responseThreads = new ResponseThread[responseThreadCount];
        for (int k = 0; k < this.responseThreads.length; ++k) {
            this.responseThreads[k] = new ResponseThread(invocationService.client.getName() + ".responsethread-" + k + "-");
        }
        switch (this.responseThreads.length) {
            case 0: {
                this.responseHandler = new SyncResponseHandler();
                break;
            }
            case 1: {
                this.responseHandler = new AsyncSingleThreadedResponseHandler();
                break;
            }
            default: {
                this.responseHandler = new AsyncMultiThreadedResponseHandler();
            }
        }
    }

    public void start() {
        for (ResponseThread responseThread : this.responseThreads) {
            responseThread.start();
        }
    }

    public void shutdown() {
        for (ResponseThread responseThread : this.responseThreads) {
            responseThread.interrupt();
        }
    }

    public ClientResponseHandler get() {
        return this.responseHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void process(ClientConnection connection, ClientMessage message) {
        try {
            this.handleClientMessage(message);
        }
        catch (Exception e) {
            this.logger.severe("Failed to process task: " + new ClientPacket(connection, message) + " on responseThread: " + Thread.currentThread().getName(), (Throwable)e);
        }
        finally {
            connection.decrementPendingPacketCount();
        }
    }

    private void handleClientMessage(ClientMessage clientMessage) {
        long correlationId = clientMessage.getCorrelationId();
        ClientInvocation future = this.invocationService.deRegisterCallId(correlationId);
        if (future == null) {
            this.logger.warning("No call for callId: " + correlationId + ", response: " + clientMessage);
            return;
        }
        if (109 == clientMessage.getMessageType()) {
            future.notifyException(this.client.getClientExceptionFactory().createException(clientMessage));
        } else {
            future.notify(clientMessage);
        }
    }

    private static class ClientPacket {
        private final ClientConnection connection;
        private final ClientMessage message;

        ClientPacket(ClientConnection connection, ClientMessage message) {
            this.connection = connection;
            this.message = message;
        }
    }

    class AsyncMultiThreadedResponseHandler
    implements ClientResponseHandler {
        AsyncMultiThreadedResponseHandler() {
        }

        @Override
        public void handle(ClientMessage message, ClientConnection connection) {
            int threadIndex = HashUtil.hashToIndex((int)((MutableInteger)INT_HOLDER.get()).getAndInc(), (int)ClientResponseHandlerSupplier.this.responseThreads.length);
            ClientResponseHandlerSupplier.this.responseThreads[threadIndex].responseQueue.add(new ClientPacket(connection, message));
        }
    }

    class AsyncSingleThreadedResponseHandler
    implements ClientResponseHandler {
        AsyncSingleThreadedResponseHandler() {
        }

        @Override
        public void handle(ClientMessage message, ClientConnection connection) {
            ClientResponseHandlerSupplier.this.responseThreads[0].responseQueue.add(new ClientPacket(connection, message));
        }
    }

    class SyncResponseHandler
    implements ClientResponseHandler {
        SyncResponseHandler() {
        }

        @Override
        public void handle(ClientMessage message, ClientConnection connection) {
            ClientResponseHandlerSupplier.this.process(connection, message);
        }
    }

    private class ResponseThread
    extends Thread {
        private final BlockingQueue<ClientPacket> responseQueue;

        ResponseThread(String name) {
            super(name);
            this.setContextClassLoader(ClientResponseHandlerSupplier.this.client.getClientConfig().getClassLoader());
            this.responseQueue = new MPSCQueue((Thread)this, InboundResponseHandlerSupplier.getIdleStrategy((HazelcastProperties)ClientResponseHandlerSupplier.this.client.getProperties(), (HazelcastProperty)IDLE_STRATEGY));
        }

        @Override
        public void run() {
            try {
                this.doRun();
            }
            catch (OutOfMemoryError e) {
                OutOfMemoryErrorDispatcher.onOutOfMemory((OutOfMemoryError)e);
            }
            catch (Throwable t) {
                ((ClientResponseHandlerSupplier)ClientResponseHandlerSupplier.this).invocationService.invocationLogger.severe(t);
            }
        }

        private void doRun() {
            while (!ClientResponseHandlerSupplier.this.invocationService.isShutdown()) {
                ClientPacket task;
                try {
                    task = this.responseQueue.take();
                }
                catch (InterruptedException e) {
                    continue;
                }
                ClientResponseHandlerSupplier.this.process(task.connection, task.message);
            }
        }
    }
}

