/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import lombok.Generated;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorProvider {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ExecutorProvider.class);
    private final int numThreads;
    private final List<Pair<ExecutorService, ExtendedThreadFactory>> executors;
    private final AtomicInteger currentThread = new AtomicInteger(0);
    private final String poolName;
    private volatile boolean isShutdown;

    public ExecutorProvider(int numThreads, String poolName) {
        this(numThreads, poolName, Thread.currentThread().isDaemon());
    }

    public ExecutorProvider(int numThreads, String poolName, boolean daemon) {
        this(numThreads, poolName, daemon, ExtendedThreadFactory::new);
    }

    @VisibleForTesting
    public ExecutorProvider(int numThreads, String poolName, boolean daemon, BiFunction<String, Boolean, ExtendedThreadFactory> threadFactoryCreator) {
        Preconditions.checkArgument(numThreads > 0);
        this.numThreads = numThreads;
        Objects.requireNonNull(poolName);
        this.executors = new ArrayList<Pair<ExecutorService, ExtendedThreadFactory>>(numThreads);
        for (int i = 0; i < numThreads; ++i) {
            ExtendedThreadFactory threadFactory = threadFactoryCreator.apply(poolName, daemon);
            ExecutorService executor = this.createExecutor(threadFactory);
            this.executors.add(Pair.of(executor, threadFactory));
        }
        this.isShutdown = false;
        this.poolName = poolName;
    }

    protected ExecutorService createExecutor(ExtendedThreadFactory threadFactory) {
        return Executors.newSingleThreadExecutor(threadFactory);
    }

    public ExecutorService getExecutor() {
        return this.executors.get((this.currentThread.getAndIncrement() & Integer.MAX_VALUE) % this.numThreads).getKey();
    }

    public ExecutorService getExecutor(Object object) {
        return this.getExecutorInternal(object == null ? -1 : object.hashCode() & Integer.MAX_VALUE);
    }

    public ExecutorService getExecutor(byte[] bytes) {
        int keyHash = Murmur3_32Hash.getInstance().makeHash(bytes);
        return this.getExecutorInternal(keyHash);
    }

    private ExecutorService getExecutorInternal(int hash) {
        return this.executors.get((hash & Integer.MAX_VALUE) % this.numThreads).getKey();
    }

    public void shutdownNow() {
        this.executors.forEach(entry -> {
            ExecutorService executor = (ExecutorService)entry.getKey();
            ExtendedThreadFactory threadFactory = (ExtendedThreadFactory)entry.getValue();
            executor.shutdownNow();
            try {
                if (!executor.awaitTermination(10L, TimeUnit.SECONDS)) {
                    log.warn("Failed to terminate executor with pool name {} within timeout. The following are stack traces of still running threads.\n{}", (Object)this.poolName, (Object)this.getThreadDump(threadFactory.getThread()));
                }
            }
            catch (InterruptedException e) {
                log.warn("Shutdown of thread pool was interrupted");
            }
        });
        this.isShutdown = true;
    }

    public boolean isShutdown() {
        return this.isShutdown;
    }

    private String getThreadDump(Thread thread) {
        StringBuilder dump = new StringBuilder();
        dump.append('\n');
        dump.append(String.format("\"%s\" %s prio=%d tid=%d %s%njava.lang.Thread.State: %s", thread.getName(), thread.isDaemon() ? "daemon" : "", thread.getPriority(), thread.getId(), Thread.State.WAITING.equals((Object)thread.getState()) ? "in Object.wait()" : thread.getState().name(), Thread.State.WAITING.equals((Object)thread.getState()) ? "WAITING (on object monitor)" : thread.getState()));
        for (StackTraceElement stackTraceElement : thread.getStackTrace()) {
            dump.append("\n        at ");
            dump.append(stackTraceElement);
        }
        dump.append("\n");
        return dump.toString();
    }

    public static class ExtendedThreadFactory
    extends DefaultThreadFactory {
        private volatile Thread thread;

        public ExtendedThreadFactory(String poolName) {
            super(poolName, false);
        }

        public ExtendedThreadFactory(String poolName, boolean daemon) {
            super(poolName, daemon);
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = super.newThread(r);
            thread.setUncaughtExceptionHandler((t, e) -> log.error("Thread {} got uncaught Exception", (Object)t.getName(), (Object)e));
            this.thread = thread;
            return thread;
        }

        @Generated
        public Thread getThread() {
            return this.thread;
        }
    }
}

