/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests;

import com.google.common.base.Charsets;
import com.google.common.io.FileWriteMode;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.stream.Collectors;
import org.apache.pulsar.buildtools.shaded.org.apache.commons.lang3.ThreadUtils;
import org.apache.pulsar.buildtools.shaded.org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.tests.BetweenTestClassesListenerAdapter;
import org.apache.pulsar.tests.ThreadDumpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.IClass;
import org.testng.ISuite;
import org.testng.ISuiteListener;
import org.testng.ITestClass;

public class ThreadLeakDetectorListener
extends BetweenTestClassesListenerAdapter
implements ISuiteListener {
    private static final Logger LOG = LoggerFactory.getLogger(ThreadLeakDetectorListener.class);
    private static final long WAIT_FOR_THREAD_TERMINATION_MILLIS = Long.parseLong(System.getenv().getOrDefault("THREAD_LEAK_DETECTOR_WAIT_MILLIS", "0"));
    private static final File DUMP_DIR = new File(System.getenv().getOrDefault("THREAD_LEAK_DETECTOR_DIR", "target/thread-leak-dumps"));
    private static final long THREAD_TERMINATION_POLL_INTERVAL = Long.parseLong(System.getenv().getOrDefault("THREAD_LEAK_DETECTOR_POLL_INTERVAL", "250"));
    private static final boolean COLLECT_THREADDUMP = Boolean.parseBoolean(System.getenv().getOrDefault("THREAD_LEAK_DETECTOR_COLLECT_THREADDUMP", "true"));
    private Set<ThreadKey> capturedThreadKeys;
    private static final Field THREAD_TARGET_FIELD;

    public void onStart(ISuite suite) {
        this.detectLeakedThreads(Collections.emptyList());
    }

    @Override
    protected void onBetweenTestClasses(List<ITestClass> testClasses) {
        this.detectLeakedThreads(testClasses);
    }

    private static String joinTestClassNames(List<ITestClass> testClasses) {
        return testClasses.stream().map(IClass::getRealClass).map(Class::getName).collect(Collectors.joining(", "));
    }

    private static String joinSimpleTestClassNames(List<ITestClass> testClasses) {
        return testClasses.stream().map(IClass::getRealClass).map(Class::getSimpleName).collect(Collectors.joining(", "));
    }

    private static String firstTestClassName(List<ITestClass> testClasses) {
        return ((ITestClass)testClasses.stream().findFirst().get()).getRealClass().getName();
    }

    private void detectLeakedThreads(List<ITestClass> testClasses) {
        LOG.info("Capturing identifiers of running threads.");
        MutableBoolean differenceDetected = new MutableBoolean();
        Set<ThreadKey> currentThreadKeys = ThreadLeakDetectorListener.compareThreads(this.capturedThreadKeys, testClasses, WAIT_FOR_THREAD_TERMINATION_MILLIS <= 0L, differenceDetected, null);
        if (WAIT_FOR_THREAD_TERMINATION_MILLIS > 0L && !testClasses.isEmpty() && differenceDetected.booleanValue()) {
            LOG.info("Difference detected in active threads. Waiting up to {} ms for threads to terminate.", (Object)WAIT_FOR_THREAD_TERMINATION_MILLIS);
            long endTime = System.currentTimeMillis() + WAIT_FOR_THREAD_TERMINATION_MILLIS;
            while (System.currentTimeMillis() < endTime) {
                try {
                    Thread.sleep(THREAD_TERMINATION_POLL_INTERVAL);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                differenceDetected.setFalse();
                currentThreadKeys = ThreadLeakDetectorListener.compareThreads(this.capturedThreadKeys, testClasses, false, differenceDetected, null);
                if (differenceDetected.booleanValue()) continue;
            }
            if (differenceDetected.booleanValue()) {
                String datetimePart = DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss.SSS").format(ZonedDateTime.now());
                PrintWriter out = null;
                String firstTestClassName = ThreadLeakDetectorListener.firstTestClassName(testClasses);
                try {
                    if (!DUMP_DIR.exists()) {
                        DUMP_DIR.mkdirs();
                    }
                    File threadleakdumpFile = new File(DUMP_DIR, "threadleak" + datetimePart + firstTestClassName + ".txt");
                    out = new PrintWriter(threadleakdumpFile);
                }
                catch (IOException e) {
                    LOG.error("Cannot write thread leak dump", (Throwable)e);
                }
                currentThreadKeys = ThreadLeakDetectorListener.compareThreads(this.capturedThreadKeys, testClasses, true, null, out);
                if (out != null) {
                    out.close();
                }
                if (COLLECT_THREADDUMP) {
                    File threaddumpFile = new File(DUMP_DIR, "threaddump" + datetimePart + firstTestClassName + ".txt");
                    try {
                        Files.asCharSink((File)threaddumpFile, (Charset)Charsets.UTF_8, (FileWriteMode[])new FileWriteMode[0]).write((CharSequence)ThreadDumpUtil.buildThreadDiagnosticString());
                    }
                    catch (IOException e) {
                        LOG.error("Cannot write thread dump", (Throwable)e);
                    }
                }
            }
        }
        this.capturedThreadKeys = currentThreadKeys;
    }

    private static Set<ThreadKey> compareThreads(Set<ThreadKey> previousThreadKeys, List<ITestClass> testClasses, boolean logDifference, MutableBoolean differenceDetected, PrintWriter out) {
        Set<ThreadKey> threadKeys = Collections.unmodifiableSet(ThreadUtils.getAllThreads().stream().filter(thread -> !ThreadLeakDetectorListener.shouldSkipThread(thread)).map(ThreadKey::of).collect(Collectors.toCollection(LinkedHashSet::new)));
        if (!testClasses.isEmpty() && previousThreadKeys != null) {
            int newThreadsCounter = 0;
            for (ThreadKey threadKey : threadKeys) {
                if (previousThreadKeys.contains(threadKey)) continue;
                ++newThreadsCounter;
                if (differenceDetected != null) {
                    differenceDetected.setTrue();
                }
                if (!logDifference && out == null) continue;
                String message = String.format("Tests in class %s created thread id %d with name '%s'", ThreadLeakDetectorListener.joinSimpleTestClassNames(testClasses), threadKey.getThreadId(), threadKey.getThreadName());
                if (logDifference) {
                    LOG.warn(message);
                }
                if (out == null) continue;
                out.println(message);
            }
            if (newThreadsCounter > 0 && (logDifference || out != null)) {
                String message = String.format("Summary: Tests in class %s created %d new threads. There are now %d threads in total.", ThreadLeakDetectorListener.joinTestClassNames(testClasses), newThreadsCounter, threadKeys.size());
                if (logDifference) {
                    LOG.warn(message);
                }
                if (out != null) {
                    out.println(message);
                }
            }
        }
        return threadKeys;
    }

    private static boolean shouldSkipThread(Thread thread) {
        String targetClassName;
        Runnable target;
        if (thread instanceof ForkJoinWorkerThread) {
            return true;
        }
        ThreadGroup threadGroup = thread.getThreadGroup();
        if (threadGroup != null && "testcontainers".equals(threadGroup.getName())) {
            return true;
        }
        String threadName = thread.getName();
        if (threadName != null) {
            if (threadName.startsWith("ClientTestFixtures-SCHEDULER-")) {
                return true;
            }
            if (threadName.equals("process reaper")) {
                return true;
            }
            if (threadName.equals("Attach Listener")) {
                return true;
            }
            if (threadName.equals("CompletableFutureDelayScheduler")) {
                return true;
            }
            if (threadName.equals("FailsafeDelayScheduler")) {
                return true;
            }
            if (threadName.equals("Okio Watchdog")) {
                return true;
            }
            if (threadName.equals("OkHttp TaskRunner")) {
                return true;
            }
            if (threadName.equals("JNA Cleaner")) {
                return true;
            }
            if (threadName.equals("Grizzly-HttpSession-Expirer")) {
                return true;
            }
            if (threadName.startsWith("testcontainers-wait-")) {
                return true;
            }
            if (threadName.startsWith("ducttape-")) {
                return true;
            }
        }
        return (target = ThreadLeakDetectorListener.extractRunnableTarget(thread)) != null && (targetClassName = target.getClass().getName()).startsWith("org.testcontainers.");
    }

    private static Runnable extractRunnableTarget(Thread thread) {
        if (THREAD_TARGET_FIELD == null) {
            return null;
        }
        Runnable target = null;
        try {
            target = (Runnable)THREAD_TARGET_FIELD.get(thread);
        }
        catch (IllegalAccessException e) {
            LOG.warn("Cannot access target field in Thread.class", (Throwable)e);
        }
        return target;
    }

    static {
        Field targetField = null;
        try {
            targetField = Thread.class.getDeclaredField("target");
            targetField.setAccessible(true);
        }
        catch (NoSuchFieldException noSuchFieldException) {
            // empty catch block
        }
        THREAD_TARGET_FIELD = targetField;
    }

    private static class ThreadKey {
        private final long threadId;
        private final int threadIdentityHashCode;
        private final String threadName;

        private ThreadKey(long threadId, int threadIdentityHashCode, String threadName) {
            this.threadId = threadId;
            this.threadIdentityHashCode = threadIdentityHashCode;
            this.threadName = threadName;
        }

        static ThreadKey of(Thread thread) {
            return new ThreadKey(thread.getId(), System.identityHashCode(thread), thread.toString());
        }

        public long getThreadId() {
            return this.threadId;
        }

        public String getThreadName() {
            return this.threadName;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ThreadKey threadKey = (ThreadKey)o;
            return this.threadId == threadKey.threadId && this.threadIdentityHashCode == threadKey.threadIdentityHashCode;
        }

        public int hashCode() {
            return Objects.hash(this.threadId, this.threadIdentityHashCode);
        }
    }
}

