/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.mutiny.operators.multi.split;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.AbstractMulti;
import io.smallrye.mutiny.operators.multi.split.MultiSplitter;
import io.smallrye.mutiny.shaded.io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.subscription.ContextSupport;
import io.smallrye.mutiny.subscription.MultiSubscriber;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

public class MultiSplitter<T, K extends Enum<K>> {
    private final Multi<? extends T> upstream;
    private final Function<T, K> splitter;
    private final ConcurrentHashMap<K, Split> splits;
    private final int requiredNumberOfSubscribers;
    private final Class<K> keyType;
    private final AtomicReference<State> state = new AtomicReference<State>(State.INIT);
    private volatile Throwable terminalFailure;
    private Flow.Subscription upstreamSubscription;

    public MultiSplitter(Multi<? extends T> upstream, Class<K> keyType, Function<T, K> splitter) {
        this.upstream = ParameterValidation.nonNull(upstream, "upstream");
        if (!ParameterValidation.nonNull(keyType, "keyType").isEnum()) {
            throw new IllegalArgumentException("The key type must be that of an enumeration");
        }
        this.keyType = keyType;
        this.splitter = ParameterValidation.nonNull(splitter, "splitter");
        this.splits = new ConcurrentHashMap();
        this.requiredNumberOfSubscribers = ((Enum[])keyType.getEnumConstants()).length;
    }

    @CheckReturnValue
    public Multi<T> get(K key) {
        return Infrastructure.onMultiCreation(new SplitMulti(this, key));
    }

    public Class<K> keyType() {
        return this.keyType;
    }

    private void onSplitRequest() {
        if (this.state.get() != State.SUBSCRIBED || this.splits.size() < this.requiredNumberOfSubscribers) {
            return;
        }
        for (SplitMulti.Split split : this.splits.values()) {
            if (split.demand.get() != 0L) continue;
            return;
        }
        this.upstreamSubscription.request(1L);
    }

    private void onUpstreamFailure() {
        for (SplitMulti.Split split : this.splits.values()) {
            split.downstream.onFailure(this.terminalFailure);
        }
        this.splits.clear();
    }

    private void onUpstreamCompletion() {
        for (SplitMulti.Split split : this.splits.values()) {
            split.downstream.onCompletion();
        }
        this.splits.clear();
    }

    private void onUpstreamItem(T item) {
        try {
            Enum key = (Enum)this.splitter.apply(item);
            if (key == null) {
                throw new NullPointerException("The splitter function returned null");
            }
            SplitMulti.Split target = (SplitMulti.Split)this.splits.get(key);
            if (target != null) {
                target.downstream.onItem(item);
                if (this.splits.size() == this.requiredNumberOfSubscribers && (target.demand.get() == Long.MAX_VALUE || target.demand.decrementAndGet() > 0L)) {
                    this.upstreamSubscription.request(1L);
                }
            }
        }
        catch (Throwable err) {
            this.terminalFailure = err;
            this.state.set(State.FAILED);
            this.onUpstreamFailure();
        }
    }

    private static enum State {
        INIT,
        AWAITING_SUBSCRIPTION,
        SUBSCRIBED,
        COMPLETED,
        FAILED;

    }

    private static class SplitMulti
    extends AbstractMulti<T> {
        private final K key;
        final /* synthetic */ MultiSplitter this$0;

        private SplitMulti(K key) {
            this.this$0 = var1_1;
            this.key = key;
        }

        @Override
        public void subscribe(MultiSubscriber<? super T> subscriber) {
            State stateWhenSubscribing;
            ParameterValidation.nonNull(subscriber, "subscriber");
            if (this.this$0.state.compareAndSet(State.INIT, State.AWAITING_SUBSCRIPTION)) {
                this.this$0.upstream.subscribe().withSubscriber(this.this$0.new Forwarder(subscriber));
            }
            if ((stateWhenSubscribing = this.this$0.state.get()) == State.FAILED) {
                subscriber.onSubscribe(Subscriptions.CANCELLED);
                subscriber.onFailure(this.this$0.terminalFailure);
                return;
            }
            if (stateWhenSubscribing == State.COMPLETED) {
                subscriber.onSubscribe(Subscriptions.CANCELLED);
                subscriber.onCompletion();
                return;
            }
            Split split = new Split(subscriber);
            Split previous = (Split)this.this$0.splits.putIfAbsent(this.key, (io.smallrye.mutiny.operators.multi.split.MultiSplitter$SplitMulti.Split)split);
            if (previous == null) {
                subscriber.onSubscribe(split);
            } else {
                subscriber.onSubscribe(Subscriptions.CANCELLED);
                subscriber.onError(new IllegalStateException("There is already a subscriber for key " + String.valueOf(this.key)));
            }
        }

        private class Split
        implements Flow.Subscription {
            MultiSubscriber<? super T> downstream;
            AtomicLong demand = new AtomicLong();

            private Split(MultiSubscriber<? super T> subscriber) {
                this.downstream = subscriber;
            }

            @Override
            public void request(long n) {
                if (n <= 0L) {
                    this.cancel();
                    this.downstream.onError(Subscriptions.getInvalidRequestException());
                    return;
                }
                Subscriptions.add(this.demand, n);
                SplitMulti.this.this$0.onSplitRequest();
            }

            @Override
            public void cancel() {
                SplitMulti.this.this$0.splits.remove(SplitMulti.this.key);
            }
        }
    }

    private class Forwarder
    implements MultiSubscriber<T>,
    ContextSupport {
        private final Context context;

        private Forwarder(MultiSubscriber<? super T> firstSubscriber) {
            this.context = firstSubscriber instanceof ContextSupport ? ((ContextSupport)((Object)firstSubscriber)).context() : Context.empty();
        }

        @Override
        public void onItem(T item) {
            if (MultiSplitter.this.state.get() != State.SUBSCRIBED) {
                return;
            }
            MultiSplitter.this.onUpstreamItem(item);
        }

        @Override
        public void onFailure(Throwable failure) {
            if (MultiSplitter.this.state.compareAndSet(State.SUBSCRIBED, State.FAILED)) {
                MultiSplitter.this.terminalFailure = failure;
                MultiSplitter.this.onUpstreamFailure();
            }
        }

        @Override
        public void onCompletion() {
            if (MultiSplitter.this.state.compareAndSet(State.SUBSCRIBED, State.COMPLETED)) {
                MultiSplitter.this.onUpstreamCompletion();
            }
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            if (MultiSplitter.this.state.get() != State.AWAITING_SUBSCRIPTION) {
                subscription.cancel();
            } else {
                MultiSplitter.this.upstreamSubscription = subscription;
                MultiSplitter.this.state.set(State.SUBSCRIBED);
                MultiSplitter.this.onSplitRequest();
            }
        }

        @Override
        public Context context() {
            return this.context;
        }
    }
}

