/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.nio.tcp.nonblocking;

import com.hazelcast.instance.HazelcastThreadGroup;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.nio.IOService;
import com.hazelcast.nio.tcp.IOThreadingModel;
import com.hazelcast.nio.tcp.SocketReader;
import com.hazelcast.nio.tcp.SocketWriter;
import com.hazelcast.nio.tcp.TcpIpConnection;
import com.hazelcast.nio.tcp.nonblocking.NonBlockingIOThread;
import com.hazelcast.nio.tcp.nonblocking.NonBlockingIOThreadOutOfMemoryHandler;
import com.hazelcast.nio.tcp.nonblocking.NonBlockingSocketReader;
import com.hazelcast.nio.tcp.nonblocking.NonBlockingSocketWriter;
import com.hazelcast.nio.tcp.nonblocking.iobalancer.IOBalancer;
import com.hazelcast.util.HashUtil;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

public class NonBlockingIOThreadingModel
implements IOThreadingModel {
    private final NonBlockingIOThread[] inputThreads;
    private final NonBlockingIOThread[] outputThreads;
    private final AtomicInteger nextInputThreadIndex = new AtomicInteger();
    private final AtomicInteger nextOutputThreadIndex = new AtomicInteger();
    private final ILogger logger;
    private final IOService ioService;
    private final MetricsRegistry metricsRegistry;
    private final LoggingService loggingService;
    private final HazelcastThreadGroup hazelcastThreadGroup;
    private boolean inputSelectNow = Boolean.getBoolean("hazelcast.io.input.thread.selectNow");
    private boolean outputSelectNow = Boolean.getBoolean("hazelcast.io.output.thread.selectNow");
    private volatile IOBalancer ioBalancer;

    public NonBlockingIOThreadingModel(IOService ioService, LoggingService loggingService, MetricsRegistry metricsRegistry, HazelcastThreadGroup hazelcastThreadGroup) {
        this.ioService = ioService;
        this.hazelcastThreadGroup = hazelcastThreadGroup;
        this.metricsRegistry = metricsRegistry;
        this.loggingService = loggingService;
        this.logger = loggingService.getLogger(NonBlockingIOThreadingModel.class);
        this.inputThreads = new NonBlockingIOThread[ioService.getInputSelectorThreadCount()];
        this.outputThreads = new NonBlockingIOThread[ioService.getOutputSelectorThreadCount()];
    }

    public void setInputSelectNow(boolean enabled) {
        this.inputSelectNow = enabled;
    }

    public void setOutputSelectNow(boolean enabled) {
        this.outputSelectNow = enabled;
    }

    @Override
    public boolean isBlocking() {
        return false;
    }

    @SuppressFBWarnings(value={"EI_EXPOSE_REP"}, justification="used only for testing")
    public NonBlockingIOThread[] getInputThreads() {
        return this.inputThreads;
    }

    @SuppressFBWarnings(value={"EI_EXPOSE_REP"}, justification="used only for testing")
    public NonBlockingIOThread[] getOutputThreads() {
        return this.outputThreads;
    }

    public IOBalancer getIOBalancer() {
        return this.ioBalancer;
    }

    @Override
    public void start() {
        NonBlockingIOThread thread;
        int i;
        this.logger.info("TcpIpConnectionManager configured with Non Blocking IO-threading model: " + this.inputThreads.length + " input threads and " + this.outputThreads.length + " output threads");
        this.logger.log(this.inputSelectNow ? Level.INFO : Level.FINE, "InputThreads selectNow enabled=" + this.inputSelectNow);
        this.logger.log(this.outputSelectNow ? Level.INFO : Level.FINE, "OutputThreads selectNow enabled=" + this.outputSelectNow);
        NonBlockingIOThreadOutOfMemoryHandler oomeHandler = new NonBlockingIOThreadOutOfMemoryHandler(){

            @Override
            public void handle(OutOfMemoryError error) {
                NonBlockingIOThreadingModel.this.ioService.onOutOfMemory(error);
            }
        };
        for (i = 0; i < this.inputThreads.length; ++i) {
            this.inputThreads[i] = thread = new NonBlockingIOThread(this.ioService.getThreadGroup(), this.ioService.getThreadPrefix() + "in-" + i, this.ioService.getLogger(NonBlockingIOThread.class.getName()), oomeHandler, this.inputSelectNow);
            this.metricsRegistry.scanAndRegister(thread, "tcp." + thread.getName());
            thread.start();
        }
        for (i = 0; i < this.outputThreads.length; ++i) {
            this.outputThreads[i] = thread = new NonBlockingIOThread(this.ioService.getThreadGroup(), this.ioService.getThreadPrefix() + "out-" + i, this.ioService.getLogger(NonBlockingIOThread.class.getName()), oomeHandler, this.outputSelectNow);
            this.metricsRegistry.scanAndRegister(thread, "tcp." + thread.getName());
            thread.start();
        }
        this.startIOBalancer();
    }

    @Override
    public void onConnectionAdded(TcpIpConnection connection) {
        this.ioBalancer.connectionAdded(connection);
    }

    @Override
    public void onConnectionRemoved(TcpIpConnection connection) {
        this.ioBalancer.connectionRemoved(connection);
    }

    private void startIOBalancer() {
        this.ioBalancer = new IOBalancer(this.inputThreads, this.outputThreads, this.hazelcastThreadGroup, this.ioService.getBalancerIntervalSeconds(), this.loggingService);
        this.ioBalancer.start();
        this.metricsRegistry.scanAndRegister(this.ioBalancer, "tcp.balancer");
    }

    @Override
    public void shutdown() {
        this.ioBalancer.stop();
        if (this.logger.isFinestEnabled()) {
            this.logger.finest("Shutting down IO selectors... Total: " + (this.inputThreads.length + this.outputThreads.length));
        }
        this.shutdown(this.inputThreads);
        this.shutdown(this.outputThreads);
    }

    private void shutdown(NonBlockingIOThread[] threads) {
        for (int i = 0; i < threads.length; ++i) {
            NonBlockingIOThread ioThread = threads[i];
            if (ioThread != null) {
                ioThread.shutdown();
            }
            threads[i] = null;
        }
    }

    @Override
    public SocketWriter newSocketWriter(TcpIpConnection connection) {
        int index = HashUtil.hashToIndex(this.nextOutputThreadIndex.getAndIncrement(), this.outputThreads.length);
        return new NonBlockingSocketWriter(connection, this.outputThreads[index], this.metricsRegistry);
    }

    @Override
    public SocketReader newSocketReader(TcpIpConnection connection) {
        int index = HashUtil.hashToIndex(this.nextInputThreadIndex.getAndIncrement(), this.inputThreads.length);
        return new NonBlockingSocketReader(connection, this.inputThreads[index], this.metricsRegistry);
    }
}

