/*
 * Decompiled with CFR 0.152.
 */
package io.atleon.core;

import io.atleon.core.Alo;
import io.atleon.core.AloFailureStrategy;
import io.atleon.core.DiscardHook;
import io.atleon.core.PresentAlo;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.SynchronousSink;
import reactor.util.context.ContextView;

final class AloOps {
    private static final Logger LOGGER = LoggerFactory.getLogger(AloOps.class);

    private AloOps() {
    }

    public static <T> BiConsumer<Alo<T>, SynchronousSink<Alo<T>>> filteringHandler(Predicate<? super T> predicate, Consumer<? super Alo<T>> negativeConsumer) {
        return (alo, sink) -> {
            Boolean result = null;
            try {
                result = alo.supplyInContext(() -> predicate.test((Object)alo.get()));
            }
            catch (Throwable error) {
                AloOps.processFailureOrNacknowledge(sink, alo, error);
            }
            if (result != null) {
                if (result.booleanValue()) {
                    sink.next(alo);
                } else {
                    AloOps.handleDiscard(sink.contextView(), alo, negativeConsumer);
                }
            }
        };
    }

    public static <T, R> BiConsumer<Alo<T>, SynchronousSink<Alo<R>>> typeFilteringHandler(Class<R> clazz, Consumer<? super Alo<T>> negativeConsumer) {
        return (alo, sink) -> {
            if (clazz.isAssignableFrom(alo.get().getClass())) {
                sink.next(alo);
            } else {
                AloOps.handleDiscard(sink.contextView(), alo, negativeConsumer);
            }
        };
    }

    public static <T, R> BiConsumer<Alo<T>, SynchronousSink<Alo<R>>> mappingHandler(Function<? super T, ? extends R> mapper) {
        return (alo, sink) -> {
            Alo result = null;
            try {
                result = Objects.requireNonNull(alo.map(mapper), "Alo implementation returned null mapping");
            }
            catch (Throwable error) {
                AloOps.processFailureOrNacknowledge(sink, alo, error);
            }
            if (result != null) {
                sink.next(result);
            }
        };
    }

    public static <T, R> BiConsumer<Alo<T>, SynchronousSink<Alo<R>>> mappingPresentHandler(Function<? super T, Optional<? extends R>> mapper, Consumer<? super Alo<T>> absentConsumer) {
        return (alo, sink) -> {
            Alo result = null;
            try {
                result = Objects.requireNonNull(alo.map(mapper), "Alo implementation returned null mapping");
            }
            catch (Throwable error) {
                AloOps.processFailureOrNacknowledge(sink, alo, error);
            }
            if (result != null) {
                if (((Optional)result.get()).isPresent()) {
                    sink.next(PresentAlo.wrap(result));
                } else {
                    absentConsumer.accept((Object)alo);
                }
            }
        };
    }

    public static <T> BiConsumer<Alo<T>, SynchronousSink<Alo<Void>>> consumingHandler(Consumer<? super T> consumer, Consumer<? super Alo<T>> afterSuccessConsumer) {
        return (alo, sink) -> {
            boolean consumed = false;
            try {
                alo.runInContext(() -> consumer.accept((Object)alo.get()));
                consumed = true;
            }
            catch (Throwable error) {
                AloOps.processFailureOrNacknowledge(sink, alo, error);
            }
            if (consumed) {
                afterSuccessConsumer.accept((Object)alo);
            }
        };
    }

    public static <T> BiConsumer<Alo<T>, SynchronousSink<Alo<T>>> failureProcessingHandler(Predicate<? super T> isFailure, Function<? super T, ? extends Throwable> errorExtractor) {
        return (alo, sink) -> {
            Object t = alo.get();
            if (isFailure.test((Object)t)) {
                AloOps.processFailure(sink, alo, (Throwable)errorExtractor.apply((Object)t), () -> sink.next(alo));
            } else {
                sink.next(alo);
            }
        };
    }

    public static <T> UnaryOperator<Alo<T>> acknowledgerDecorator(Consumer<? super T> decorator) {
        return alo -> {
            Object t = alo.get();
            Runnable decoratedAcknowledger = AloOps.combineRunnables(() -> decorator.accept((Object)t), alo.getAcknowledger());
            return alo.propagator().create(t, decoratedAcknowledger, alo.getNacknowledger());
        };
    }

    public static <T> Alo<List<T>> fanIn(List<Alo<T>> alos) {
        Alo<Object> firstAlo = alos.get(0);
        if (alos.size() == 1) {
            return firstAlo.map(Collections::singletonList);
        }
        return firstAlo.fanInPropagator(alos).create(alos.stream().map(Alo::get).collect(Collectors.toList()), AloOps.combineAcknowledgers(alos.stream().map(Alo::getAcknowledger).collect(Collectors.toList())), AloOps.combineNacknowledgers(alos.stream().map(Alo::getNacknowledger).collect(Collectors.toList())));
    }

    private static void processFailureOrNacknowledge(SynchronousSink<?> sink, Alo<?> alo, Throwable error) {
        AloOps.processFailure(sink, alo, error, () -> Alo.nacknowledge(alo, error));
    }

    private static void processFailure(SynchronousSink<?> sink, Alo<?> alo, Throwable error, Runnable unprocessedFallback) {
        if (!AloFailureStrategy.choose(sink).process(alo, error, arg_0 -> sink.error(arg_0))) {
            unprocessedFallback.run();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static <T> void handleDiscard(ContextView contextView, Alo<T> alo, Consumer<? super Alo<T>> afterHandle) {
        try {
            alo.runInContext(() -> DiscardHook.choose(contextView).accept(alo.get()));
        }
        catch (Throwable error) {
            LOGGER.warn("Error in discard hook", error);
        }
        finally {
            afterHandle.accept(alo);
        }
    }

    private static Runnable combineAcknowledgers(Iterable<? extends Runnable> acknowledgers) {
        return () -> acknowledgers.forEach(Runnable::run);
    }

    private static Consumer<? super Throwable> combineNacknowledgers(Iterable<? extends Consumer<? super Throwable>> nacknowledgers) {
        return error -> nacknowledgers.forEach(nacknowledger -> nacknowledger.accept(error));
    }

    private static Runnable combineRunnables(Runnable first, Runnable second) {
        return () -> {
            first.run();
            second.run();
        };
    }
}

