/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.Alo;
import io.atleon.core.SerialQueue;
import io.atleon.util.Consuming;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

final class AloErrorDelegatingOperator<T>
extends FluxOperator<Alo<T>, Alo<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AloErrorDelegatingOperator.class);
    private final BiFunction<? super T, ? super Throwable, ? extends Publisher<?>> delegator;

    AloErrorDelegatingOperator(Flux<Alo<T>> source, BiFunction<? super T, ? super Throwable, ? extends Publisher<?>> delegator) {
        super(source);
        this.delegator = delegator;
    }

    public void subscribe(CoreSubscriber<? super Alo<T>> actual) {
        this.source.subscribe((CoreSubscriber)new AloErrorDelegatingSubscriber(actual));
    }

    private final class AloErrorDelegatingSubscriber
    implements CoreSubscriber<Alo<T>> {
        private final CoreSubscriber<? super Alo<T>> actual;
        private final SerialQueue<Consumer<Collection<Disposable>>> inFlight = SerialQueue.on(Collections.newSetFromMap(new IdentityHashMap()));
        private volatile boolean unsuccessfullyDone = false;

        private AloErrorDelegatingSubscriber(CoreSubscriber<? super Alo<T>> actual) {
            this.actual = actual;
        }

        public Context currentContext() {
            return this.actual.currentContext();
        }

        public void onSubscribe(final Subscription s) {
            Subscription decoratedSubscription = new Subscription(){

                public void request(long n) {
                    s.request(n);
                }

                public void cancel() {
                    AloErrorDelegatingSubscriber.this.unsuccessfullyDone = true;
                    try {
                        s.cancel();
                    }
                    finally {
                        AloErrorDelegatingSubscriber.this.safelyDisposeAllInFlight();
                    }
                }
            };
            this.actual.onSubscribe(decoratedSubscription);
        }

        public void onNext(Alo<T> alo) {
            Consumer<Throwable> nacknowledger = error -> alo.runInContext(() -> this.delegateAloError(alo, (Throwable)error));
            this.actual.onNext(alo.propagator().create(alo.get(), alo.getAcknowledger(), nacknowledger));
        }

        public void onError(Throwable t) {
            this.unsuccessfullyDone = true;
            this.safelyDisposeAllInFlight();
            this.actual.onError(t);
        }

        public void onComplete() {
            this.actual.onComplete();
        }

        private void delegateAloError(Alo<T> alo, Throwable error) {
            AtomicReference disposableReference = new AtomicReference();
            ConnectableFlux connectableFlux = this.delegateError(alo.get(), error).doAfterTerminate(() -> this.inFlight.addAndDrain(disposables -> disposables.remove(disposableReference.get()))).publish();
            connectableFlux.subscribe(Consuming.noOp(), alo.getNacknowledger(), alo.getAcknowledger());
            this.inFlight.addAndDrain(disposables -> {
                if (!this.unsuccessfullyDone) {
                    connectableFlux.connect(disposable -> {
                        disposableReference.set(disposable);
                        disposables.add(disposable);
                    });
                }
            });
        }

        private void safelyDisposeAllInFlight() {
            try {
                this.inFlight.addAndDrain(disposables -> disposables.forEach(Disposable::dispose));
            }
            catch (Exception e) {
                LOGGER.error("Failed to dispose all in-flight error delegations. This may cause a memory leak...", (Throwable)e);
            }
        }

        private Flux<?> delegateError(T t, Throwable error) {
            try {
                return Flux.from((Publisher)((Publisher)AloErrorDelegatingOperator.this.delegator.apply(t, error))).onErrorMap(delegateError -> this.consolidateErrors(error, (Throwable)delegateError));
            }
            catch (Throwable delegateError2) {
                return Flux.error((Throwable)this.consolidateErrors(error, delegateError2));
            }
        }

        private Throwable consolidateErrors(Throwable originalError, Throwable delegateError) {
            if (originalError != delegateError) {
                originalError.addSuppressed(delegateError);
            }
            return originalError;
        }
    }
}

