/*
 * Decompiled with CFR 0.152.
 */
package mutiny.zero.internal;

import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongConsumer;
import mutiny.zero.Tube;
import mutiny.zero.internal.Helper;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public abstract class TubeBase<T>
implements Tube<T>,
Subscription {
    protected final Subscriber<? super T> subscriber;
    protected volatile boolean cancelled;
    protected final AtomicInteger wip = new AtomicInteger();
    protected final AtomicLong requested = new AtomicLong();
    protected final ConcurrentLinkedQueue<T> dispatchQueue = new ConcurrentLinkedQueue();
    protected volatile Throwable failure;
    protected volatile boolean completed = false;
    protected Runnable terminationAction = () -> {};
    protected LongConsumer requestConsumer = count -> {};
    protected Runnable cancellationAction = () -> {};

    protected TubeBase(Subscriber<? super T> subscriber) {
        this.subscriber = subscriber;
    }

    public void request(long n) {
        if (this.cancelled) {
            return;
        }
        if (n <= 0L) {
            this.fail(Helper.negativeRequest(n));
        } else {
            Helper.add(this.requested, n);
            this.requestConsumer.accept(n);
        }
        this.drainLoop();
    }

    public void cancel() {
        if (this.cancelled) {
            return;
        }
        this.cancelled = true;
        this.dispatchQueue.clear();
        this.cancellationAction.run();
        this.terminationAction.run();
    }

    @Override
    public Tube<T> send(T item) {
        if (this.cancelled()) {
            return this;
        }
        if (item == null) {
            this.fail(new NullPointerException("The item is null"));
        } else {
            this.handleItem(item);
        }
        return this;
    }

    @Override
    public void fail(Throwable err) {
        if (this.cancelled) {
            return;
        }
        if (err == null) {
            err = new NullPointerException("The error is null");
        }
        this.failure = err;
        this.drainLoop();
    }

    @Override
    public void complete() {
        if (this.completed) {
            throw new IllegalStateException("Already completed");
        }
        if (this.cancelled) {
            return;
        }
        this.completed = true;
        this.drainLoop();
    }

    @Override
    public boolean cancelled() {
        return this.cancelled;
    }

    @Override
    public long outstandingRequests() {
        return this.requested.get();
    }

    @Override
    public Tube<T> whenCancelled(Runnable action) {
        Objects.requireNonNull(action, "The cancellation action cannot be null");
        this.cancellationAction = action;
        return this;
    }

    @Override
    public Tube<T> whenTerminates(Runnable action) {
        Objects.requireNonNull(action, "The termination action cannot be null");
        this.terminationAction = action;
        return this;
    }

    @Override
    public Tube<T> whenRequested(LongConsumer consumer) {
        Objects.requireNonNull(consumer, "The request consumer cannot be null");
        this.requestConsumer = consumer;
        long outstanding = this.outstandingRequests();
        if (outstanding > 0L) {
            consumer.accept(outstanding);
        }
        return this;
    }

    protected abstract void handleItem(T var1);

    protected void drainLoop() {
        if (this.wip.getAndIncrement() != 0) {
            return;
        }
        int missed = 1;
        ConcurrentLinkedQueue<T> queue = this.dispatchQueue;
        while (missed != 0) {
            long emitted = 0L;
            long pending = this.outstandingRequests();
            if (this.cancelled) {
                queue.clear();
                this.cancellationAction.run();
                return;
            }
            do {
                if (this.cancelled) {
                    queue.clear();
                    this.cancellationAction.run();
                    return;
                }
                Object item = queue.poll();
                if (item == null && this.completed) {
                    this.cancelled = true;
                    if (this.failure != null) {
                        this.subscriber.onError(this.failure);
                    } else {
                        this.subscriber.onComplete();
                    }
                    this.terminationAction.run();
                    return;
                }
                if (item == null) break;
                this.subscriber.onNext(item);
            } while (++emitted != pending);
            if (emitted > 0L) {
                this.requested.addAndGet(-emitted);
            }
            if (this.cancelled) {
                queue.clear();
                return;
            }
            if (this.failure != null) {
                this.cancelled = true;
                this.subscriber.onError(this.failure);
                this.terminationAction.run();
                return;
            }
            if (queue.isEmpty() && this.completed) {
                this.cancelled = true;
                this.subscriber.onComplete();
                this.terminationAction.run();
                return;
            }
            missed = this.wip.addAndGet(-missed);
        }
    }
}

