/*
 * Decompiled with CFR 0.152.
 */
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.Executor;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AbstractNoHandleSubscribePublisher;
import io.servicetalk.concurrent.api.AsyncContextProvider;
import io.servicetalk.concurrent.api.CapturedContext;
import io.servicetalk.concurrent.api.Executors;
import io.servicetalk.concurrent.api.PublishAndSubscribeOnPublishers;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.ConcurrentTerminalSubscriber;
import io.servicetalk.concurrent.internal.EmptySubscriptions;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nullable;

final class TimeoutPublisher<T>
extends AbstractNoHandleSubscribePublisher<T> {
    private final Publisher<T> original;
    private final Executor timeoutExecutor;
    private final long durationNs;
    private final boolean restartAtOnNext;

    TimeoutPublisher(Publisher<T> original, long duration, TimeUnit unit, boolean restartAtOnNext, Executor timeoutExecutor) {
        this.original = Objects.requireNonNull(original);
        this.timeoutExecutor = Objects.requireNonNull(timeoutExecutor);
        this.durationNs = Math.max(0L, unit.toNanos(duration));
        this.restartAtOnNext = restartAtOnNext;
    }

    @Override
    void handleSubscribe(PublisherSource.Subscriber<? super T> subscriber, CapturedContext capturedContext, AsyncContextProvider contextProvider) {
        this.original.delegateSubscribe(TimeoutSubscriber.newInstance(this, subscriber, capturedContext, contextProvider), capturedContext, contextProvider);
    }

    private static final class TimeoutSubscriber<X>
    extends AbstractTimeoutSubscriber<X> {
        private static final Cancellable TIMER_PROCESSING = () -> {};
        private static final Cancellable TIMER_FIRED = () -> {};
        private final TimeoutPublisher<X> parent;
        private volatile long lastStartNS;

        private TimeoutSubscriber(TimeoutPublisher<X> parent, PublisherSource.Subscriber<? super X> target, AsyncContextProvider contextProvider) {
            super(target, contextProvider);
            this.parent = parent;
            this.lastStartNS = ((TimeoutPublisher)parent).timeoutExecutor.currentTime(TimeUnit.NANOSECONDS);
        }

        static <X> TimeoutSubscriber<X> newInstance(TimeoutPublisher<X> parent, PublisherSource.Subscriber<? super X> target, CapturedContext capturedContext, AsyncContextProvider contextProvider) {
            TimeoutSubscriber<X> s = new TimeoutSubscriber<X>(parent, target, contextProvider);
            s.initTimer(((TimeoutPublisher)parent).durationNs, ((TimeoutPublisher)parent).timeoutExecutor, capturedContext);
            return s;
        }

        public void onNext(X x) {
            if (((TimeoutPublisher)this.parent).restartAtOnNext) {
                this.lastStartNS = ((TimeoutPublisher)this.parent).timeoutExecutor.currentTime(TimeUnit.NANOSECONDS);
            }
            this.target.onNext(x);
        }

        public void request(long n) {
            PublisherSource.Subscription subscription = this.subscription;
            assert (subscription != null);
            subscription.request(n);
        }

        @Override
        void timerFires() {
            Cancellable previousTimerCancellable;
            while (true) {
                if ((previousTimerCancellable = this.timerCancellable) == LOCAL_IGNORE_CANCEL || previousTimerCancellable == TIMER_FIRED) {
                    return;
                }
                if (previousTimerCancellable == TIMER_PROCESSING) {
                    if (!timerCancellableUpdater.compareAndSet(this, TIMER_PROCESSING, TIMER_FIRED)) continue;
                    return;
                }
                if (timerCancellableUpdater.compareAndSet(this, previousTimerCancellable, TIMER_PROCESSING)) break;
            }
            block3: while (true) {
                Cancellable nextTimerCancellable;
                long currentTimeNs = ((TimeoutPublisher)this.parent).timeoutExecutor.currentTime(TimeUnit.NANOSECONDS);
                long nextTimeoutNs = ((TimeoutPublisher)this.parent).durationNs - (currentTimeNs - this.lastStartNS);
                if (nextTimeoutNs <= 0L) {
                    this.offloadTimeout(new TimeoutException((((TimeoutPublisher)this.parent).restartAtOnNext ? "between onNext" : "until terminal") + " timeout after " + TimeUnit.NANOSECONDS.toMillis(((TimeoutPublisher)this.parent).durationNs) + "ms"), ((TimeoutPublisher)this.parent).timeoutExecutor);
                    return;
                }
                try {
                    nextTimerCancellable = Objects.requireNonNull(((TimeoutPublisher)this.parent).timeoutExecutor.schedule(this::timerFires, nextTimeoutNs, TimeUnit.NANOSECONDS), () -> "Executor.schedule " + ((TimeoutPublisher)this.parent).timeoutExecutor + " returned null");
                }
                catch (Throwable cause) {
                    this.offloadTimeout(cause, ((TimeoutPublisher)this.parent).timeoutExecutor);
                    return;
                }
                if (timerCancellableUpdater.compareAndSet(this, previousTimerCancellable, nextTimerCancellable)) {
                    return;
                }
                while (true) {
                    if ((previousTimerCancellable = this.timerCancellable) == LOCAL_IGNORE_CANCEL) {
                        nextTimerCancellable.cancel();
                        return;
                    }
                    if (previousTimerCancellable == TIMER_FIRED) {
                        if (!timerCancellableUpdater.compareAndSet(this, TIMER_FIRED, TIMER_PROCESSING)) continue;
                        previousTimerCancellable = TIMER_PROCESSING;
                        continue block3;
                    }
                    if (timerCancellableUpdater.compareAndSet(this, previousTimerCancellable, nextTimerCancellable)) break block3;
                }
                break;
            }
        }

        @Override
        void stopTimer(boolean terminal) {
            timerCancellableUpdater.getAndSet(this, LOCAL_IGNORE_CANCEL).cancel();
        }
    }

    static abstract class AbstractTimeoutSubscriber<X>
    implements PublisherSource.Subscriber<X>,
    PublisherSource.Subscription {
        static final Cancellable LOCAL_IGNORE_CANCEL = () -> {};
        static final AtomicReferenceFieldUpdater<AbstractTimeoutSubscriber, Cancellable> timerCancellableUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractTimeoutSubscriber.class, Cancellable.class, "timerCancellable");
        static final AtomicReferenceFieldUpdater<AbstractTimeoutSubscriber, PublisherSource.Subscription> subscriptionUpdater = AtomicReferenceFieldUpdater.newUpdater(AbstractTimeoutSubscriber.class, PublisherSource.Subscription.class, "subscription");
        final ConcurrentTerminalSubscriber<? super X> target;
        final AsyncContextProvider contextProvider;
        @Nullable
        volatile PublisherSource.Subscription subscription;
        @Nullable
        volatile Cancellable timerCancellable;

        AbstractTimeoutSubscriber(PublisherSource.Subscriber<? super X> target, AsyncContextProvider contextProvider) {
            this.target = new ConcurrentTerminalSubscriber(target, false);
            this.contextProvider = contextProvider;
        }

        final void initTimer(long durationNs, Executor timeoutExecutor, CapturedContext capturedContext) {
            try {
                timerCancellableUpdater.compareAndSet(this, null, Objects.requireNonNull(timeoutExecutor.schedule(this::timerFires, durationNs, TimeUnit.NANOSECONDS)));
            }
            catch (Throwable cause) {
                AbstractTimeoutSubscriber.handleConstructorException(this, capturedContext, this.contextProvider, cause);
            }
        }

        public final void onSubscribe(PublisherSource.Subscription s) {
            if (subscriptionUpdater.compareAndSet(this, null, (PublisherSource.Subscription)ConcurrentSubscription.wrap((PublisherSource.Subscription)s))) {
                this.target.onSubscribe((PublisherSource.Subscription)this);
            } else {
                s.cancel();
            }
        }

        public final void onError(Throwable t) {
            if (this.target.processOnError(t)) {
                this.stopTimer(true);
            }
        }

        public final void onComplete() {
            if (this.target.processOnComplete()) {
                this.stopTimer(true);
            }
        }

        public final void cancel() {
            PublisherSource.Subscription subscription = this.subscription;
            assert (subscription != null);
            try {
                this.stopTimer(true);
            }
            finally {
                subscription.cancel();
            }
        }

        abstract void stopTimer(boolean var1);

        abstract void timerFires();

        final void offloadTimeout(Throwable cause, Executor executor) {
            if (Executors.immediate() == executor) {
                this.processTimeout(cause);
            } else {
                this.contextProvider.wrapConsumer(this::processTimeout, this.contextProvider.captureContext()).accept(cause);
            }
        }

        final void processTimeout(Throwable cause) {
            PublisherSource.Subscription subscription = subscriptionUpdater.getAndSet(this, EmptySubscriptions.EMPTY_SUBSCRIPTION);
            if (this.target.deferredOnError(cause)) {
                try {
                    if (subscription != null) {
                        subscription.cancel();
                    } else {
                        this.target.onSubscribe(EmptySubscriptions.EMPTY_SUBSCRIPTION);
                    }
                }
                finally {
                    this.target.deliverDeferredTerminal();
                }
            }
        }

        private static <X> void handleConstructorException(AbstractTimeoutSubscriber<X> s, CapturedContext capturedContext, AsyncContextProvider contextProvider, Throwable cause) {
            s.timerCancellable = LOCAL_IGNORE_CANCEL;
            s.subscription = EmptySubscriptions.EMPTY_SUBSCRIPTION;
            PublishAndSubscribeOnPublishers.deliverOnSubscribeAndOnError(s.target, capturedContext, contextProvider, cause);
        }
    }
}

