/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.spi.impl;

import com.hazelcast.client.impl.clientside.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.spi.impl.AbstractClientInvocationService;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.instance.OutOfMemoryErrorDispatcher;
import com.hazelcast.internal.util.concurrent.MPSCQueue;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.spi.properties.HazelcastProperty;
import com.hazelcast.util.HashUtil;
import com.hazelcast.util.MutableInteger;
import com.hazelcast.util.function.Consumer;
import com.hazelcast.util.function.Supplier;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class ClientResponseHandlerSupplier
implements Supplier<Consumer<ClientMessage>> {
    private static final HazelcastProperty IDLE_STRATEGY = new HazelcastProperty("hazelcast.client.responsequeue.idlestrategy", "block");
    private static final HazelcastProperty MIN_CONCURRENT_INVOCATIONS = new HazelcastProperty("hazelcast.client.responsequeue.dynamic.min.concurrent.invocations", "4");
    private static final ThreadLocal<MutableInteger> INT_HOLDER = new ThreadLocal<MutableInteger>(){

        @Override
        protected MutableInteger initialValue() {
            return new MutableInteger();
        }
    };
    private final AbstractClientInvocationService invocationService;
    private final ResponseThread[] responseThreads;
    private final HazelcastClientInstanceImpl client;
    private final ILogger logger;
    private final Consumer<ClientMessage> responseHandler;
    private final boolean responseThreadsDynamic;
    private final int minConcurrentInvocations;

    public ClientResponseHandlerSupplier(AbstractClientInvocationService invocationService) {
        this.invocationService = invocationService;
        this.client = invocationService.client;
        this.logger = invocationService.invocationLogger;
        HazelcastProperties properties = this.client.getProperties();
        this.minConcurrentInvocations = properties.getInteger(MIN_CONCURRENT_INVOCATIONS);
        int responseThreadCount = properties.getInteger(ClientProperty.RESPONSE_THREAD_COUNT);
        if (responseThreadCount < 0) {
            throw new IllegalArgumentException(ClientProperty.RESPONSE_THREAD_COUNT.getName() + " can't be smaller than 0");
        }
        this.responseThreadsDynamic = properties.getBoolean(ClientProperty.RESPONSE_THREAD_DYNAMIC);
        this.logger.info("Running with " + responseThreadCount + " response threads, dynamic=" + this.responseThreadsDynamic);
        this.responseThreads = new ResponseThread[responseThreadCount];
        for (int k = 0; k < this.responseThreads.length; ++k) {
            this.responseThreads[k] = new ResponseThread(invocationService.client.getName() + ".responsethread-" + k + "-");
        }
        this.responseHandler = responseThreadCount == 0 ? new SyncResponseHandler() : (this.responseThreadsDynamic ? new DynamicResponseHandler() : new AsyncResponseHandler());
    }

    public void start() {
        if (this.responseThreadsDynamic) {
            return;
        }
        for (ResponseThread responseThread : this.responseThreads) {
            responseThread.start();
        }
    }

    public void shutdown() {
        for (ResponseThread responseThread : this.responseThreads) {
            responseThread.interrupt();
        }
    }

    public Consumer<ClientMessage> get() {
        return this.responseHandler;
    }

    private void process(ClientMessage response) {
        try {
            this.handleResponse(response);
        }
        catch (Exception e) {
            this.logger.severe("Failed to process response: " + response + " on responseThread: " + Thread.currentThread().getName(), (Throwable)e);
        }
    }

    private void handleResponse(ClientMessage message) {
        long correlationId = message.getCorrelationId();
        ClientInvocation future = this.invocationService.deRegisterCallId(correlationId);
        if (future == null) {
            this.logger.warning("No call for callId: " + correlationId + ", response: " + message);
            return;
        }
        if (109 == message.getMessageType()) {
            future.notifyException(this.client.getClientExceptionFactory().createException(message));
        } else {
            future.notify(message);
        }
    }

    private ResponseThread nextResponseThread() {
        if (this.responseThreads.length == 1) {
            return this.responseThreads[0];
        }
        int index = HashUtil.hashToIndex((int)INT_HOLDER.get().getAndInc(), (int)this.responseThreads.length);
        return this.responseThreads[index];
    }

    class DynamicResponseHandler
    implements Consumer<ClientMessage> {
        DynamicResponseHandler() {
        }

        public void accept(ClientMessage message) {
            if (ClientResponseHandlerSupplier.this.invocationService.concurrentInvocations() <= (long)ClientResponseHandlerSupplier.this.minConcurrentInvocations) {
                ClientResponseHandlerSupplier.this.process(message);
            } else {
                ResponseThread responseThread = ClientResponseHandlerSupplier.this.nextResponseThread();
                responseThread.queue(message);
                responseThread.ensureStarted();
            }
        }
    }

    class AsyncResponseHandler
    implements Consumer<ClientMessage> {
        AsyncResponseHandler() {
        }

        public void accept(ClientMessage message) {
            ClientResponseHandlerSupplier.this.nextResponseThread().queue(message);
        }
    }

    class SyncResponseHandler
    implements Consumer<ClientMessage> {
        SyncResponseHandler() {
        }

        public void accept(ClientMessage message) {
            ClientResponseHandlerSupplier.this.process(message);
        }
    }

    private class ResponseThread
    extends Thread {
        private final BlockingQueue<ClientMessage> responseQueue;
        private final AtomicBoolean started;

        ResponseThread(String name) {
            super(name);
            this.started = new AtomicBoolean();
            this.setContextClassLoader(ClientResponseHandlerSupplier.this.client.getClientConfig().getClassLoader());
            this.responseQueue = new MPSCQueue((Thread)this, InboundResponseHandlerSupplier.getIdleStrategy((HazelcastProperties)ClientResponseHandlerSupplier.this.client.getProperties(), (HazelcastProperty)IDLE_STRATEGY));
        }

        @Override
        public void run() {
            try {
                this.doRun();
            }
            catch (OutOfMemoryError e) {
                OutOfMemoryErrorDispatcher.onOutOfMemory((OutOfMemoryError)e);
            }
            catch (Throwable t) {
                ((ClientResponseHandlerSupplier)ClientResponseHandlerSupplier.this).invocationService.invocationLogger.severe(t);
            }
        }

        private void doRun() {
            while (!ClientResponseHandlerSupplier.this.invocationService.isShutdown()) {
                ClientMessage response;
                try {
                    response = this.responseQueue.take();
                }
                catch (InterruptedException e) {
                    continue;
                }
                ClientResponseHandlerSupplier.this.process(response);
            }
        }

        private void queue(ClientMessage message) {
            this.responseQueue.add(message);
        }

        @SuppressFBWarnings(value={"IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD"}, justification="The thread.start method is the one we want to call")
        private void ensureStarted() {
            if (!this.started.get() && this.started.compareAndSet(false, true)) {
                this.start();
            }
        }
    }
}

