/*
 * Decompiled with CFR 0.152.
 */
package com.aol.cyclops.data.async;

import com.aol.cyclops.control.ReactiveSeq;
import com.aol.cyclops.data.async.Adapter;
import com.aol.cyclops.data.async.AdaptersModule;
import com.aol.cyclops.data.async.ContinuationStrategy;
import com.aol.cyclops.data.async.QueueFactory;
import com.aol.cyclops.data.async.Signal;
import com.aol.cyclops.data.async.Topic;
import com.aol.cyclops.data.async.wait.DirectWaitStrategy;
import com.aol.cyclops.data.async.wait.WaitStrategy;
import com.aol.cyclops.internal.react.exceptions.SimpleReactProcessingException;
import com.aol.cyclops.react.async.subscription.AlwaysContinue;
import com.aol.cyclops.react.async.subscription.Continueable;
import com.aol.cyclops.types.futurestream.Continuation;
import com.aol.cyclops.util.ExceptionSoftener;
import com.aol.cyclops.util.SimpleTimer;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class Queue<T>
implements Adapter<T> {
    private static final PoisonPill POISON_PILL = new PoisonPill();
    private static final PoisonPill CLEAR_PILL = new PoisonPill();
    private volatile boolean open = true;
    private final AtomicInteger listeningStreams = new AtomicInteger();
    private final int timeout;
    private final TimeUnit timeUnit;
    private final long offerTimeout;
    private final TimeUnit offerTimeUnit;
    private final int maxPoisonPills;
    private final BlockingQueue<T> queue;
    private final WaitStrategy<T> consumerWait;
    private final WaitStrategy<T> producerWait;
    private volatile Signal<Integer> sizeSignal;
    private volatile Continueable sub;
    private ContinuationStrategy continuationStrategy;
    private volatile boolean shuttingDown = false;
    public static final NIL NILL = new NIL();

    public Queue() {
        this(new LinkedBlockingQueue());
    }

    public Queue(QueueFactory<T> factory2) {
        Queue<T> q = factory2.build();
        this.queue = q.queue;
        this.timeout = q.timeout;
        this.timeUnit = q.timeUnit;
        this.maxPoisonPills = q.maxPoisonPills;
        this.offerTimeout = q.offerTimeout;
        this.offerTimeUnit = q.offerTimeUnit;
        this.consumerWait = q.consumerWait;
        this.producerWait = q.producerWait;
    }

    Queue(BlockingQueue<T> queue, WaitStrategy<T> consumer, WaitStrategy<T> producer) {
        this.queue = queue;
        this.timeout = -1;
        this.timeUnit = TimeUnit.MILLISECONDS;
        this.maxPoisonPills = 90000;
        this.offerTimeout = Integer.MAX_VALUE;
        this.offerTimeUnit = TimeUnit.DAYS;
        this.consumerWait = consumer;
        this.producerWait = producer;
    }

    public Queue(BlockingQueue<T> queue) {
        this(queue, (WaitStrategy<T>)new DirectWaitStrategy(), (WaitStrategy<T>)new DirectWaitStrategy());
    }

    Queue(BlockingQueue<T> queue, Signal<Integer> sizeSignal) {
        this(queue, (WaitStrategy<T>)new DirectWaitStrategy(), (WaitStrategy<T>)new DirectWaitStrategy());
    }

    public Queue(java.util.Queue<T> q, WaitStrategy<T> consumer, WaitStrategy<T> producer) {
        this(new AdaptersModule.QueueToBlockingQueueWrapper(q), consumer, producer);
    }

    public static <T> Queue<T> createMergeQueue() {
        Queue<T> q = new Queue<T>();
        q.continuationStrategy = new AdaptersModule.StreamOfContinuations(q);
        return q;
    }

    @Override
    public ReactiveSeq<T> stream() {
        this.listeningStreams.incrementAndGet();
        return ReactiveSeq.fromStream(this.closingStream(this::get, new AlwaysContinue()));
    }

    @Override
    public ReactiveSeq<T> stream(Continueable s) {
        this.sub = s;
        this.listeningStreams.incrementAndGet();
        return ReactiveSeq.fromStream(this.closingStream(this::get, s));
    }

    public ReactiveSeq<Collection<T>> streamBatchNoTimeout(Continueable s, Function<Supplier<T>, Supplier<Collection<T>>> batcher) {
        this.sub = s;
        this.listeningStreams.incrementAndGet();
        return ReactiveSeq.fromStream(this.closingStreamBatch(batcher.apply(() -> this.ensureOpen(this.timeout, this.timeUnit)), s));
    }

    public ReactiveSeq<Collection<T>> streamBatch(Continueable s, Function<BiFunction<Long, TimeUnit, T>, Supplier<Collection<T>>> batcher) {
        this.sub = s;
        this.listeningStreams.incrementAndGet();
        return ReactiveSeq.fromStream(this.closingStreamBatch(batcher.apply((timeout, timeUnit) -> this.ensureOpen((long)timeout, (TimeUnit)((Object)timeUnit))), s));
    }

    public ReactiveSeq<T> streamControl(Continueable s, Function<Supplier<T>, Supplier<T>> batcher) {
        this.listeningStreams.incrementAndGet();
        return ReactiveSeq.fromStream(this.closingStream(batcher.apply(() -> this.ensureOpen(this.timeout, this.timeUnit)), s));
    }

    public ReactiveSeq<CompletableFuture<T>> streamControlFutures(Continueable s, Function<Supplier<T>, CompletableFuture<T>> batcher) {
        this.sub = s;
        this.listeningStreams.incrementAndGet();
        return ReactiveSeq.fromStream(this.closingStreamFutures(() -> (CompletableFuture)batcher.apply(() -> this.ensureOpen(this.timeout, this.timeUnit)), s));
    }

    private Stream<Collection<T>> closingStreamBatch(Supplier<Collection<T>> s, Continueable sub) {
        Stream<Collection<T>> st = StreamSupport.stream(new AdaptersModule.ClosingSpliterator<Collection<T>>(Long.MAX_VALUE, s, sub, this), false);
        return st;
    }

    private Stream<T> closingStream(Supplier<T> s, Continueable sub) {
        Stream<T> st = StreamSupport.stream(new AdaptersModule.ClosingSpliterator<T>(Long.MAX_VALUE, s, sub, this), false);
        return st;
    }

    private Stream<CompletableFuture<T>> closingStreamFutures(Supplier<CompletableFuture<T>> s, Continueable sub) {
        Stream<CompletableFuture<T>> st = StreamSupport.stream(new AdaptersModule.ClosingSpliterator<CompletableFuture<T>>(Long.MAX_VALUE, s, sub, this), false);
        return st;
    }

    @Override
    public ReactiveSeq<CompletableFuture<T>> streamCompletableFutures() {
        return this.stream().map(CompletableFuture::completedFuture);
    }

    @Override
    public boolean fromStream(Stream<T> stream) {
        stream.collect(Collectors.toCollection(() -> this.queue));
        return true;
    }

    private T ensureOpen(long timeout, TimeUnit timeUnit) {
        if (!this.open && this.queue.size() == 0) {
            throw new ClosedQueueException();
        }
        SimpleTimer timer = new SimpleTimer();
        long timeoutNanos = timeUnit.toNanos(timeout);
        Object data = null;
        try {
            if (this.continuationStrategy != null) {
                while (this.open) {
                    Object e = this.ensureClear(this.queue.poll());
                    data = e;
                    if (e != null) break;
                    this.continuationStrategy.handleContinuation();
                    if (timeout == -1L) continue;
                    this.handleTimeout(timer, timeoutNanos);
                }
                if (data != null) {
                    return (T)this.nillSafe(this.ensureNotPoisonPill(this.ensureClear(data)));
                }
            }
            if (!this.open && this.queue.size() == 0) {
                throw new ClosedQueueException();
            }
            if (timeout == -1L) {
                if (this.sub != null && this.sub.timeLimit() > -1L) {
                    data = this.ensureClear(this.consumerWait.take(() -> this.queue.poll(this.sub.timeLimit(), TimeUnit.NANOSECONDS)));
                    if (data == null) {
                        throw new QueueTimeoutException();
                    }
                } else {
                    data = this.ensureClear(this.consumerWait.take(() -> this.queue.take()));
                }
            } else {
                data = this.ensureClear(this.consumerWait.take(() -> this.queue.poll(timeout, timeUnit)));
                if (data == null) {
                    throw new QueueTimeoutException();
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw ExceptionSoftener.throwSoftenedException(e);
        }
        this.ensureNotPoisonPill(data);
        if (this.sizeSignal != null) {
            this.sizeSignal.set(this.queue.size());
        }
        return (T)this.nillSafe(data);
    }

    private void handleTimeout(SimpleTimer timer, long timeout) {
        if (timer.getElapsedNanoseconds() > timeout) {
            throw new QueueTimeoutException();
        }
    }

    private T ensureClear(T poll) {
        if (CLEAR_PILL == poll) {
            if (this.queue.size() > 0) {
                poll = this.ensureClear(this.queue.poll());
            }
            this.queue.clear();
        }
        return poll;
    }

    private T ensureNotPoisonPill(T data) {
        if (data instanceof PoisonPill) {
            throw new ClosedQueueException();
        }
        return data;
    }

    public T poll(long time, TimeUnit unit) throws QueueTimeoutException {
        return this.ensureOpen(time, unit);
    }

    public T get() {
        return this.ensureOpen(this.timeout, this.timeUnit);
    }

    public boolean add(T data) {
        try {
            boolean result = this.queue.add(this.nullSafe(data));
            if (result && this.sizeSignal != null) {
                this.sizeSignal.set(this.queue.size());
            }
            return result;
        }
        catch (IllegalStateException e) {
            return false;
        }
    }

    @Override
    public boolean offer(T data) {
        if (!this.open) {
            throw new ClosedQueueException();
        }
        try {
            boolean result = this.producerWait.offer(() -> this.queue.offer(this.nullSafe(data), this.offerTimeout, this.offerTimeUnit));
            if (this.sizeSignal != null) {
                this.sizeSignal.set(this.queue.size());
            }
            return result;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw ExceptionSoftener.throwSoftenedException(e);
        }
    }

    private boolean timeout(SimpleTimer timer) {
        return timer.getElapsedNanoseconds() >= this.offerTimeUnit.toNanos(this.offerTimeout);
    }

    private Object nillSafe(T data) {
        if (NILL == data) {
            return null;
        }
        return data;
    }

    private Object nullSafe(T data) {
        if (data == null) {
            return NILL;
        }
        return data;
    }

    @Override
    public boolean close() {
        this.open = false;
        if (this.queue.remainingCapacity() > 0) {
            for (int i = 0; i < Math.min(this.maxPoisonPills, this.listeningStreams.get()); ++i) {
                this.add(POISON_PILL);
            }
        }
        return true;
    }

    public void closeAndClear() {
        this.open = false;
        this.add(CLEAR_PILL);
    }

    public int size() {
        return this.queue.size();
    }

    public boolean isOpen() {
        return this.open;
    }

    public void addContinuation(Continuation c) {
        if (this.continuationStrategy == null) {
            this.continuationStrategy = new AdaptersModule.SingleContinuation(this);
        }
        this.continuationStrategy.addContinuation(c);
    }

    @Override
    public <R> R visit(Function<? super Queue<T>, ? extends R> caseQueue, Function<? super Topic<T>, ? extends R> caseTopic) {
        return caseQueue.apply(this);
    }

    public Queue<T> withOpen(boolean open) {
        return this.open == open ? this : new Queue<T>(open, this.timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, this.producerWait, this.sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withTimeout(int timeout) {
        return this.timeout == timeout ? this : new Queue<T>(this.open, timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, this.producerWait, this.sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withTimeUnit(TimeUnit timeUnit) {
        return this.timeUnit == timeUnit ? this : new Queue<T>(this.open, this.timeout, timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, this.producerWait, this.sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withOfferTimeout(long offerTimeout) {
        return this.offerTimeout == offerTimeout ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, this.producerWait, this.sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withOfferTimeUnit(TimeUnit offerTimeUnit) {
        return this.offerTimeUnit == offerTimeUnit ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, this.offerTimeout, offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, this.producerWait, this.sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withMaxPoisonPills(int maxPoisonPills) {
        return this.maxPoisonPills == maxPoisonPills ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, maxPoisonPills, this.queue, this.consumerWait, this.producerWait, this.sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withQueue(BlockingQueue<T> queue) {
        return this.queue == queue ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, queue, this.consumerWait, this.producerWait, this.sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withConsumerWait(WaitStrategy<T> consumerWait) {
        return this.consumerWait == consumerWait ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, consumerWait, this.producerWait, this.sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withProducerWait(WaitStrategy<T> producerWait) {
        return this.producerWait == producerWait ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, producerWait, this.sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withSizeSignal(Signal<Integer> sizeSignal) {
        return this.sizeSignal == sizeSignal ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, this.producerWait, sizeSignal, this.sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withSub(Continueable sub) {
        return this.sub == sub ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, this.producerWait, this.sizeSignal, sub, this.continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withContinuationStrategy(ContinuationStrategy continuationStrategy) {
        return this.continuationStrategy == continuationStrategy ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, this.producerWait, this.sizeSignal, this.sub, continuationStrategy, this.shuttingDown);
    }

    public Queue<T> withShuttingDown(boolean shuttingDown) {
        return this.shuttingDown == shuttingDown ? this : new Queue<T>(this.open, this.timeout, this.timeUnit, this.offerTimeout, this.offerTimeUnit, this.maxPoisonPills, this.queue, this.consumerWait, this.producerWait, this.sizeSignal, this.sub, this.continuationStrategy, shuttingDown);
    }

    @ConstructorProperties(value={"open", "timeout", "timeUnit", "offerTimeout", "offerTimeUnit", "maxPoisonPills", "queue", "consumerWait", "producerWait", "sizeSignal", "sub", "continuationStrategy", "shuttingDown"})
    public Queue(boolean open, int timeout, TimeUnit timeUnit, long offerTimeout, TimeUnit offerTimeUnit, int maxPoisonPills, BlockingQueue<T> queue, WaitStrategy<T> consumerWait, WaitStrategy<T> producerWait, Signal<Integer> sizeSignal, Continueable sub, ContinuationStrategy continuationStrategy, boolean shuttingDown) {
        this.open = open;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.offerTimeout = offerTimeout;
        this.offerTimeUnit = offerTimeUnit;
        this.maxPoisonPills = maxPoisonPills;
        this.queue = queue;
        this.consumerWait = consumerWait;
        this.producerWait = producerWait;
        this.sizeSignal = sizeSignal;
        this.sub = sub;
        this.continuationStrategy = continuationStrategy;
        this.shuttingDown = shuttingDown;
    }

    BlockingQueue<T> getQueue() {
        return this.queue;
    }

    public Signal<Integer> getSizeSignal() {
        return this.sizeSignal;
    }

    public void setSizeSignal(Signal<Integer> sizeSignal) {
        this.sizeSignal = sizeSignal;
    }

    public static class QueueReader<T> {
        Queue<T> queue;
        private volatile T last = null;

        public boolean notEmpty() {
            return ((Queue)this.queue).queue.size() != 0;
        }

        private int size() {
            return ((Queue)this.queue).queue.size();
        }

        public T next() {
            this.last = ((Queue)this.queue).ensureOpen(((Queue)this.queue).timeout, ((Queue)this.queue).timeUnit);
            return this.last;
        }

        public boolean isOpen() {
            return ((Queue)this.queue).open || this.notEmpty();
        }

        public Collection<T> drainToOrBlock() {
            ArrayList<Object> result = new ArrayList<Object>();
            if (this.size() > 0) {
                ((Queue)this.queue).queue.drainTo(result);
            } else {
                try {
                    result.add(((Queue)this.queue).ensureOpen(((Queue)this.queue).timeout, ((Queue)this.queue).timeUnit));
                }
                catch (ClosedQueueException e) {
                    ((Queue)this.queue).open = false;
                    throw e;
                }
            }
            return result.stream().filter(it -> it != POISON_PILL).collect(Collectors.toList());
        }

        @ConstructorProperties(value={"queue", "last"})
        public QueueReader(Queue<T> queue, T last) {
            this.queue = queue;
            this.last = last;
        }

        public Queue<T> getQueue() {
            return this.queue;
        }

        public T getLast() {
            return this.last;
        }
    }

    public static class NIL {
    }

    private static class PoisonPill {
        private PoisonPill() {
        }
    }

    public static class QueueTimeoutException
    extends SimpleReactProcessingException {
        private static final long serialVersionUID = 1L;

        @Override
        public Throwable fillInStackTrace() {
            return this;
        }
    }

    public static class ClosedQueueException
    extends SimpleReactProcessingException {
        private static final long serialVersionUID = 1L;
        private final List currentData;

        public ClosedQueueException() {
            this.currentData = null;
        }

        public boolean isDataPresent() {
            return this.currentData != null;
        }

        @Override
        public Throwable fillInStackTrace() {
            return this;
        }

        @ConstructorProperties(value={"currentData"})
        public ClosedQueueException(List currentData) {
            this.currentData = currentData;
        }

        public List getCurrentData() {
            return this.currentData;
        }
    }
}

