/*
 * Decompiled with CFR 0.152.
 */
package org.jruby.ext.thread;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyMarshal;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.exceptions.RaiseException;
import org.jruby.runtime.ObjectAllocator;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.Visibility;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.runtime.marshal.DataType;

@JRubyClass(name={"Queue"})
public class Queue
extends RubyObject
implements DataType {
    protected volatile boolean closed = false;
    protected volatile int capacity;
    protected final AtomicInteger count = new AtomicInteger();
    transient Node head;
    protected transient Node last;
    protected final ReentrantLock takeLock = new ReentrantLock();
    protected final Condition notEmpty = this.takeLock.newCondition();
    protected final ReentrantLock putLock = new ReentrantLock();
    protected final Condition notFull = this.putLock.newCondition();
    private static final RubyThread.Task<Queue, IRubyObject> BLOCKING_POP_TASK = new RubyThread.Task<Queue, IRubyObject>(){

        @Override
        public IRubyObject run(ThreadContext context, Queue queue) throws InterruptedException {
            return queue.takeInternal(context);
        }

        @Override
        public void wakeup(RubyThread thread2, Queue queue) {
            thread2.getNativeThread().interrupt();
        }
    };
    private static final RubyThread.Task<Queue, IRubyObject> NONBLOCKING_POP_TASK = new RubyThread.Task<Queue, IRubyObject>(){

        @Override
        public IRubyObject run(ThreadContext context, Queue queue) throws InterruptedException {
            IRubyObject result2 = queue.pollInternal();
            if (result2 == null) {
                throw context.runtime.newThreadError("queue empty");
            }
            return result2;
        }

        @Override
        public void wakeup(RubyThread thread2, Queue queue) {
            thread2.getNativeThread().interrupt();
        }
    };

    protected void signalNotEmpty() {
        ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            this.notEmpty.signal();
        }
        finally {
            takeLock.unlock();
        }
    }

    protected void signalNotFull() {
        ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            this.notFull.signal();
        }
        finally {
            putLock.unlock();
        }
    }

    protected void enqueue(Node node) {
        this.last = this.last.next = node;
    }

    protected IRubyObject dequeue() {
        Node h = this.head;
        Node first2 = h.next;
        h.next = h;
        this.head = first2;
        IRubyObject x = first2.item;
        first2.item = null;
        return x;
    }

    protected void fullyLock() {
        this.putLock.lock();
        this.takeLock.lock();
    }

    protected void fullyUnlock() {
        this.takeLock.unlock();
        this.putLock.unlock();
    }

    protected void initializedCheck() {
        if (this.capacity == 0) {
            throw this.getRuntime().newTypeError(this + " not initialized");
        }
    }

    public Queue(Ruby runtime2, RubyClass type2) {
        super(runtime2, type2);
        this.last = this.head = new Node(null);
    }

    public static void setup(Ruby runtime2) {
        RubyClass cQueue = runtime2.getThread().defineClassUnder("Queue", runtime2.getObject(), new ObjectAllocator(){

            @Override
            public IRubyObject allocate(Ruby runtime2, RubyClass klass) {
                return new Queue(runtime2, klass);
            }
        });
        cQueue.undefineMethod("initialize_copy");
        cQueue.setReifiedClass(Queue.class);
        cQueue.defineAnnotatedMethods(Queue.class);
        runtime2.getObject().setConstant("Queue", cQueue);
        RubyClass cClosedQueueError = cQueue.defineClassUnder("ClosedQueueError", runtime2.getStopIteration(), runtime2.getStopIteration().getAllocator());
        runtime2.getObject().setConstant("ClosedQueueError", cClosedQueueError);
    }

    @Override
    @JRubyMethod(visibility=Visibility.PRIVATE)
    public IRubyObject initialize(ThreadContext context) {
        this.capacity = Integer.MAX_VALUE;
        return this;
    }

    @JRubyMethod
    public IRubyObject clear(ThreadContext context) {
        this.initializedCheck();
        try {
            this.clearInternal();
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "clear");
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void clearInternal() throws InterruptedException {
        ReentrantLock putLock = this.putLock;
        ReentrantLock takeLock = this.takeLock;
        putLock.lockInterruptibly();
        try {
            takeLock.lockInterruptibly();
            try {
                Node p2;
                Node h = this.head;
                while ((p2 = h.next) != null) {
                    h.next = h;
                    p2.item = null;
                    h = p2;
                }
                this.head = this.last;
                if (this.count.getAndSet(0) == this.capacity) {
                    this.notFull.signal();
                }
            }
            finally {
                takeLock.unlock();
            }
        }
        finally {
            putLock.unlock();
        }
    }

    @JRubyMethod(name={"empty?"})
    public RubyBoolean empty_p(ThreadContext context) {
        this.initializedCheck();
        return context.runtime.newBoolean(this.count.get() == 0);
    }

    @JRubyMethod(name={"length", "size"})
    public RubyNumeric length(ThreadContext context) {
        this.initializedCheck();
        return RubyNumeric.int2fix(context.runtime, this.count.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @JRubyMethod
    public RubyNumeric num_waiting(ThreadContext context) {
        this.initializedCheck();
        ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            RubyFixnum rubyFixnum = context.runtime.newFixnum(takeLock.getWaitQueueLength(this.notEmpty));
            takeLock.unlock();
            return rubyFixnum;
        }
        catch (Throwable throwable) {
            try {
                takeLock.unlock();
                throw throwable;
            }
            catch (InterruptedException ie) {
                throw this.createInterruptedError(context, "num_waiting");
            }
        }
    }

    @JRubyMethod(name={"pop", "deq", "shift"})
    public IRubyObject pop(ThreadContext context) {
        this.initializedCheck();
        try {
            return context.getThread().executeTask(context, this, BLOCKING_POP_TASK);
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "pop");
        }
    }

    @JRubyMethod(name={"pop", "deq", "shift"})
    public IRubyObject pop(ThreadContext context, IRubyObject arg0) {
        this.initializedCheck();
        try {
            return context.getThread().executeTask(context, this, !arg0.isTrue() ? BLOCKING_POP_TASK : NONBLOCKING_POP_TASK);
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "pop");
        }
    }

    @JRubyMethod(name={"push", "<<", "enq"})
    public IRubyObject push(ThreadContext context, IRubyObject value2) {
        this.initializedCheck();
        try {
            this.putInternal(context, value2);
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "push");
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void putInternal(ThreadContext context, IRubyObject e) throws InterruptedException {
        if (e == null) {
            throw new NullPointerException();
        }
        int c = -1;
        Node node = new Node(e);
        ReentrantLock putLock = this.putLock;
        AtomicInteger count2 = this.count;
        putLock.lockInterruptibly();
        try {
            boolean isClosed;
            while (!(isClosed = this.closed) && count2.get() >= this.capacity) {
                this.notFull.await();
            }
            if (isClosed) {
                this.notFull.signal();
                this.raiseClosedError(context);
            }
            this.enqueue(node);
            c = count2.getAndIncrement();
            if (c + 1 < this.capacity) {
                this.notFull.signal();
            }
        }
        finally {
            putLock.unlock();
        }
        if (c == 0) {
            this.signalNotEmpty();
        }
    }

    @JRubyMethod
    public IRubyObject marshal_dump(ThreadContext context) {
        return RubyMarshal.undumpable(context, this);
    }

    @JRubyMethod
    public IRubyObject close(ThreadContext context) {
        this.initializedCheck();
        try {
            this.closeInternal();
        }
        catch (InterruptedException ie) {
            throw this.createInterruptedError(context, "close");
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void closeInternal() throws InterruptedException {
        ReentrantLock putLock = this.putLock;
        ReentrantLock takeLock = this.takeLock;
        AtomicInteger count2 = this.count;
        putLock.lockInterruptibly();
        try {
            takeLock.lockInterruptibly();
            try {
                if (this.closed) {
                    return;
                }
                this.closed = true;
                int c = count2.get();
                if (c >= this.capacity) {
                    this.notFull.signal();
                } else if (c == 0) {
                    this.notEmpty.signal();
                }
            }
            finally {
                takeLock.unlock();
            }
        }
        finally {
            putLock.unlock();
        }
    }

    @JRubyMethod(name={"closed?"})
    public IRubyObject closed_p(ThreadContext context) {
        this.initializedCheck();
        return context.runtime.newBoolean(this.closed);
    }

    public synchronized void shutdown() throws InterruptedException {
        this.closeInternal();
    }

    public boolean isShutdown() {
        return this.closed;
    }

    public synchronized void checkShutdown() {
        if (this.isShutdown()) {
            Ruby runtime2 = this.getRuntime();
            throw RaiseException.from(runtime2, runtime2.getThreadError(), "queue shut down");
        }
    }

    protected long java_length() {
        return this.count.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected IRubyObject takeInternal(ThreadContext context) throws InterruptedException {
        IRubyObject x;
        int c = -1;
        AtomicInteger count2 = this.count;
        ReentrantLock takeLock = this.takeLock;
        boolean notFullSignalNeeded = false;
        takeLock.lockInterruptibly();
        try {
            boolean canDequeue;
            boolean isClosed;
            while (!(isClosed = this.closed) && count2.get() == 0) {
                this.notEmpty.await();
            }
            boolean bl = canDequeue = !isClosed || count2.get() != 0;
            if (canDequeue) {
                x = this.dequeue();
                c = count2.getAndDecrement();
            } else {
                x = context.nil;
            }
            if (c > 1 || isClosed) {
                this.notEmpty.signal();
            }
            if (canDequeue) {
                notFullSignalNeeded = c == this.capacity;
            }
        }
        finally {
            takeLock.unlock();
        }
        if (notFullSignalNeeded) {
            this.signalNotFull();
        }
        return x;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public IRubyObject pollInternal() throws InterruptedException {
        AtomicInteger count2 = this.count;
        if (count2.get() == 0) {
            return null;
        }
        IRubyObject x = null;
        boolean notFullSignalNeeded = false;
        ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            if (count2.get() > 0) {
                x = this.dequeue();
                int c = count2.getAndDecrement();
                if (c > 1) {
                    this.notEmpty.signal();
                }
                notFullSignalNeeded = c == this.capacity;
            }
        }
        finally {
            takeLock.unlock();
        }
        if (notFullSignalNeeded) {
            this.signalNotFull();
        }
        return x;
    }

    public IRubyObject raiseClosedError(ThreadContext context) {
        throw context.runtime.newRaiseException(context.runtime.getClass("ClosedQueueError"), "queue closed");
    }

    protected RaiseException createInterruptedError(ThreadContext context, String methodName) {
        return context.runtime.newThreadError("interrupted in " + this.getMetaClass().getName() + "#" + methodName);
    }

    static class Node {
        IRubyObject item;
        Node next;

        Node(IRubyObject x) {
            this.item = x;
        }
    }
}

