/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.executor;

import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import org.neo4j.helpers.Exceptions;
import org.neo4j.kernel.impl.transaction.log.ParkStrategy;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutor;

public class DynamicTaskExecutor
implements TaskExecutor {
    public static final ParkStrategy DEFAULT_PARK_STRATEGY = new ParkStrategy.Park(10);
    private final BlockingQueue<Callable<?>> queue;
    private final ParkStrategy parkStrategy;
    private final String processorThreadNamePrefix;
    private volatile Processor[] processors = new Processor[0];
    private volatile boolean shutDown;
    private volatile Throwable shutDownCause;
    private static final Thread.UncaughtExceptionHandler SILENT_UNCAUGHT_EXCEPTION_HANDLER = new Thread.UncaughtExceptionHandler(){

        @Override
        public void uncaughtException(Thread t, Throwable e) {
        }
    };

    public DynamicTaskExecutor(int initialProcessorCount, int maxQueueSize, ParkStrategy parkStrategy, String processorThreadNamePrefix) {
        this.parkStrategy = parkStrategy;
        this.processorThreadNamePrefix = processorThreadNamePrefix;
        this.queue = new ArrayBlockingQueue(maxQueueSize);
        this.setNumberOfProcessors(initialProcessorCount);
    }

    @Override
    public synchronized void setNumberOfProcessors(int count) {
        Processor[] newProcessors;
        this.assertNotShutDown();
        assert (count > 0);
        if (count == this.processors.length) {
            return;
        }
        if (count > this.processors.length) {
            newProcessors = Arrays.copyOf(this.processors, count);
            for (int i = this.processors.length; i < newProcessors.length; ++i) {
                newProcessors[i] = new Processor(this.processorThreadNamePrefix + "-" + i);
            }
        } else {
            newProcessors = Arrays.copyOf(this.processors, count);
            for (int i = newProcessors.length; i < this.processors.length; ++i) {
                this.processors[i].shutDown = true;
            }
        }
        this.processors = newProcessors;
    }

    @Override
    public int numberOfProcessors() {
        return this.processors.length;
    }

    @Override
    public synchronized boolean incrementNumberOfProcessors() {
        this.setNumberOfProcessors(this.numberOfProcessors() + 1);
        return true;
    }

    @Override
    public synchronized boolean decrementNumberOfProcessors() {
        if (this.numberOfProcessors() == 1) {
            return false;
        }
        this.setNumberOfProcessors(this.numberOfProcessors() - 1);
        return true;
    }

    @Override
    public void submit(Callable<?> task) {
        this.assertNotShutDown();
        while (!this.queue.offer(task)) {
            this.parkAWhile();
            this.assertNotShutDown();
        }
        this.notifyProcessors();
    }

    private void assertNotShutDown() {
        if (this.shutDown) {
            String message = "Executor has been shut down";
            throw this.shutDownCause != null ? new IllegalStateException(message, this.shutDownCause) : new IllegalStateException(message);
        }
    }

    private void notifyProcessors() {
        for (Processor processor : this.processors) {
            this.parkStrategy.unpark(processor);
        }
    }

    @Override
    public void shutdown(boolean awaitAllCompleted) {
        this.shutdown0(awaitAllCompleted, null);
    }

    private synchronized void shutdown0(boolean awaitAllCompleted, Throwable cause) {
        if (this.shutDown) {
            return;
        }
        this.shutDownCause = cause;
        this.shutDown = true;
        while (awaitAllCompleted && !this.queue.isEmpty()) {
            this.parkAWhile();
        }
        for (Processor processor : this.processors) {
            processor.shutDown = true;
        }
        while (awaitAllCompleted && this.anyAlive()) {
            this.parkAWhile();
        }
    }

    private boolean anyAlive() {
        for (Processor processor : this.processors) {
            if (!processor.isAlive()) continue;
            return true;
        }
        return false;
    }

    private void parkAWhile() {
        this.parkStrategy.park(Thread.currentThread());
    }

    private class Processor
    extends Thread {
        private volatile boolean shutDown;

        Processor(String name) {
            super(name);
            this.setUncaughtExceptionHandler(SILENT_UNCAUGHT_EXCEPTION_HANDLER);
            this.start();
        }

        @Override
        public void run() {
            while (!this.shutDown) {
                Callable task = (Callable)DynamicTaskExecutor.this.queue.poll();
                if (task != null) {
                    try {
                        task.call();
                        continue;
                    }
                    catch (Throwable e) {
                        DynamicTaskExecutor.this.shutdown0(false, e);
                        throw Exceptions.launderedException(e);
                    }
                }
                DynamicTaskExecutor.this.parkAWhile();
            }
        }
    }
}

