/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.util;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.util.Queue;
import org.jgroups.util.QueueClosedException;
import org.jgroups.util.ReusableThread;
import org.jgroups.util.SchedulerListener;
import org.jgroups.util.ThreadPool;
import org.jgroups.util.Util;

public class Scheduler
implements Runnable {
    final Queue queue = new Queue();
    Thread sched_thread = null;
    Task current_task = null;
    ThreadPool pool = null;
    SchedulerListener listener = null;
    protected static final Log log = LogFactory.getLog((Class)Scheduler.class);
    boolean concurrent_processing = false;
    int NUM_THREADS = 128;
    static final int WAIT_FOR_THREAD_AVAILABILITY = 3000;
    static final int THREAD_JOIN_TIMEOUT = 1000;

    public Scheduler() {
        String tmp = Util.getProperty(new String[]{"jgroups.scheduler.max_threads"}, null, null, false, "128");
        this.NUM_THREADS = Integer.parseInt(tmp);
    }

    public Scheduler(int num_threads) {
        this.NUM_THREADS = num_threads;
    }

    public void setListener(SchedulerListener l) {
        this.listener = l;
    }

    public boolean getConcurrentProcessing() {
        return this.concurrent_processing;
    }

    public void setConcurrentProcessing(boolean process_concurrently) {
        this.concurrent_processing = process_concurrently;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        while (this.sched_thread != null && !this.queue.closed()) {
            try {
                this.current_task = (Task)this.queue.peek();
                if (this.current_task == null) {
                    if (!log.isWarnEnabled()) continue;
                    log.warn((Object)("current task is null, queue.size()=" + this.queue.size() + ", queue.closed()=" + this.queue.closed() + ", continuing"));
                    continue;
                }
                if (this.current_task.suspended) {
                    this.current_task.suspended = false;
                    this.current_task.thread.resume();
                    if (this.listener != null) {
                        this.listener.resumed(this.current_task.target);
                    }
                } else {
                    if (this.current_task.thread == null) {
                        this.current_task.thread = this.pool.getThread();
                        if (this.current_task.thread == null) {
                            if (log.isWarnEnabled()) {
                                log.warn((Object)"thread pool exhausted, waiting for 3000ms before retrying");
                            }
                            Util.sleep(3000L);
                            continue;
                        }
                    }
                    if (this.listener != null) {
                        this.listener.started(this.current_task.target);
                    }
                    if (!this.current_task.thread.assignTask(this.current_task.target)) continue;
                }
                if (this.sched_thread.isInterrupted()) {
                    throw new InterruptedException();
                }
                if (!this.concurrent_processing) {
                    ReusableThread reusableThread = this.current_task.thread;
                    synchronized (reusableThread) {
                        while (!this.current_task.thread.done() && !this.current_task.thread.suspended) {
                            this.current_task.thread.wait();
                        }
                    }
                    if (this.listener != null) {
                        this.listener.stopped(this.current_task.target);
                    }
                }
                this.queue.removeElement(this.current_task);
            }
            catch (InterruptedException interrupted) {
                if (this.sched_thread == null || this.queue.closed()) break;
                if (this.current_task.thread != null) {
                    this.current_task.thread.suspend();
                    if (this.listener != null) {
                        this.listener.suspended(this.current_task.target);
                    }
                    this.current_task.suspended = true;
                }
                Thread.interrupted();
            }
            catch (QueueClosedException closed_ex) {
                return;
            }
            catch (Throwable ex) {
                if (!log.isErrorEnabled()) continue;
                log.error((Object)("exception=" + Util.print(ex)));
            }
        }
        if (log.isTraceEnabled()) {
            log.trace((Object)"scheduler thread terminated");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addPrio(Runnable task) {
        block8: {
            Task new_task = new Task(task);
            boolean do_interrupt = false;
            try {
                Queue queue = this.queue;
                synchronized (queue) {
                    if (this.queue.size() == 0) {
                        this.queue.add(new_task);
                    } else {
                        this.queue.addAtHead(new_task);
                        do_interrupt = true;
                    }
                }
                if (do_interrupt) {
                    this.sched_thread.interrupt();
                }
            }
            catch (Throwable e) {
                if (!log.isErrorEnabled()) break block8;
                log.error((Object)("exception=" + e));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void add(Runnable task) {
        block5: {
            Task new_task = new Task(task);
            try {
                Queue queue = this.queue;
                synchronized (queue) {
                    this.queue.add(new_task);
                }
            }
            catch (Exception e) {
                if (!log.isErrorEnabled()) break block5;
                log.error((Object)("exception=" + e));
            }
        }
    }

    public void start() {
        if (this.queue.closed()) {
            this.queue.reset();
        }
        if (this.sched_thread == null) {
            this.pool = new ThreadPool(this.NUM_THREADS);
            this.sched_thread = new Thread((Runnable)this, "Scheduler main thread");
            this.sched_thread.setDaemon(true);
            this.sched_thread.start();
        }
    }

    public void stop() {
        Thread tmp = null;
        this.queue.close(false);
        if (this.sched_thread != null && this.sched_thread.isAlive()) {
            tmp = this.sched_thread;
            this.sched_thread = null;
            tmp.interrupt();
            try {
                tmp.join(1000L);
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (tmp.isAlive() && log.isErrorEnabled()) {
                log.error((Object)"scheduler thread is still not dead  !!!");
            }
        }
        this.sched_thread = null;
        if (this.pool != null) {
            this.pool.destroy();
            this.pool = null;
        }
    }

    public static class Task {
        ReusableThread thread = null;
        Runnable target = null;
        boolean suspended = false;

        Task(Runnable target) {
            this.target = target;
        }

        public String toString() {
            return "[thread=" + this.thread + ", target=" + this.target + ", suspended=" + this.suspended + ']';
        }
    }
}

