/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.Acknowledgement;
import io.atleon.core.Alo;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFailureStrategy;
import java.lang.invoke.LambdaMetafactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

final class AcknowledgingPublisher<T>
implements Publisher<Alo<T>> {
    private final Alo<Publisher<T>> aloSource;
    private final AtomicBoolean subscribedOnce = new AtomicBoolean(false);

    private AcknowledgingPublisher(Alo<Publisher<T>> aloSource) {
        this.aloSource = aloSource;
    }

    public static <T> Publisher<Alo<T>> fromAloPublisher(Alo<Publisher<T>> aloPublisher) {
        return new AcknowledgingPublisher<T>(aloPublisher);
    }

    public void subscribe(Subscriber<? super Alo<T>> subscriber) {
        if (!this.subscribedOnce.compareAndSet(false, true)) {
            throw new IllegalStateException("AcknowledgingPublisher may only be subscribed to once");
        }
        AcknowledgingSubscriber<T> acknowledgingSubscriber = new AcknowledgingSubscriber<T>(this.aloSource, subscriber);
        this.aloSource.runInContext(() -> this.aloSource.get().subscribe(acknowledgingSubscriber));
    }

    private static final class AcknowledgingSubscriber<T>
    implements Subscriber<T> {
        private static final AtomicReferenceFieldUpdater<AcknowledgingSubscriber, State> STATE = AtomicReferenceFieldUpdater.newUpdater(AcknowledgingSubscriber.class, State.class, "state");
        private static final AtomicLongFieldUpdater<AcknowledgingSubscriber> COUNT = AtomicLongFieldUpdater.newUpdater(AcknowledgingSubscriber.class, "count");
        private final Alo<Publisher<T>> aloSource;
        private final AloFactory<T> factory;
        private final Subscriber<? super Alo<T>> subscriber;
        private volatile State state = State.ACTIVE;
        private volatile long count = 1L;

        public AcknowledgingSubscriber(Alo<Publisher<T>> aloSource, Subscriber<? super Alo<T>> subscriber) {
            this.aloSource = aloSource;
            this.factory = aloSource.propagator();
            this.subscriber = subscriber;
        }

        public void onSubscribe(Subscription subscription) {
            this.subscriber.onSubscribe(subscription);
        }

        public void onNext(T value) {
            Acknowledgement acknowledgement = Acknowledgement.create(this::inFlightAcknowledged, this::inFlightNacknowledged);
            COUNT.incrementAndGet(this);
            this.subscriber.onNext(this.factory.create(value, acknowledgement::positive, acknowledgement::negative));
        }

        /*
         * Unable to fully structure code
         */
        public void onError(Throwable error) {
            errorReference = new AtomicReference<Object>(null);
            if (!AcknowledgingSubscriber.STATE.compareAndSet(this, State.ACTIVE, State.EXECUTING)) ** GOTO lbl-1000
            if (AloFailureStrategy.choose(this.subscriber).process(this.aloSource, error, (Consumer<Throwable>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)V, set(V ), (Ljava/lang/Throwable;)V)(errorReference))) {
                AcknowledgingSubscriber.STATE.set(this, State.EXECUTED);
            } else lbl-1000:
            // 2 sources

            {
                this.maybeExecuteNacknowledger(error, (Predicate<State>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Z, lambda$onError$0(io.atleon.core.AcknowledgingPublisher$AcknowledgingSubscriber$State ), (Lio/atleon/core/AcknowledgingPublisher$AcknowledgingSubscriber$State;)Z)());
            }
            errorToEmit = errorReference.get();
            if (errorToEmit != null) {
                this.subscriber.onError(errorToEmit);
            } else {
                this.subscriber.onComplete();
            }
        }

        public void onComplete() {
            if (STATE.compareAndSet(this, State.ACTIVE, State.IN_FLIGHT) && COUNT.decrementAndGet(this) == 0L) {
                this.maybeExecuteAcknowledger();
            }
            this.subscriber.onComplete();
        }

        private void inFlightAcknowledged() {
            if (COUNT.decrementAndGet(this) == 0L) {
                this.maybeExecuteAcknowledger();
            }
        }

        private void inFlightNacknowledged(Throwable error) {
            this.maybeExecuteNacknowledger(error, priorState -> priorState == State.ACTIVE || priorState == State.IN_FLIGHT);
        }

        private void maybeExecuteAcknowledger() {
            if (STATE.compareAndSet(this, State.IN_FLIGHT, State.EXECUTED)) {
                Alo.acknowledge(this.aloSource);
            }
        }

        private void maybeExecuteNacknowledger(Throwable error, Predicate<State> priorStateMustMatch) {
            if (priorStateMustMatch.test(STATE.getAndSet(this, State.EXECUTED))) {
                Alo.nacknowledge(this.aloSource, error);
            }
        }

        private static /* synthetic */ boolean lambda$onError$0(State priorState) {
            return priorState == State.ACTIVE || priorState == State.EXECUTING;
        }

        private static enum State {
            ACTIVE,
            IN_FLIGHT,
            EXECUTING,
            EXECUTED;

        }
    }
}

