/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.streams;

import io.reactivex.Flowable;
import io.smallrye.reactive.streams.operators.Operator;
import io.smallrye.reactive.streams.operators.ProcessingStage;
import io.smallrye.reactive.streams.operators.ProcessorOperator;
import io.smallrye.reactive.streams.operators.PublisherOperator;
import io.smallrye.reactive.streams.operators.PublisherStage;
import io.smallrye.reactive.streams.operators.TerminalOperator;
import io.smallrye.reactive.streams.operators.TerminalStage;
import io.smallrye.reactive.streams.spi.Transformer;
import io.smallrye.reactive.streams.stages.Stages;
import io.smallrye.reactive.streams.utils.ConnectableProcessor;
import io.smallrye.reactive.streams.utils.DefaultSubscriberWithCompletionStage;
import io.smallrye.reactive.streams.utils.WrappedProcessor;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.streams.operators.spi.Graph;
import org.eclipse.microprofile.reactive.streams.operators.spi.ReactiveStreamsEngine;
import org.eclipse.microprofile.reactive.streams.operators.spi.Stage;
import org.eclipse.microprofile.reactive.streams.operators.spi.SubscriberWithCompletionStage;
import org.eclipse.microprofile.reactive.streams.operators.spi.UnsupportedStageException;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;

public class Engine
implements ReactiveStreamsEngine {
    public <T> Publisher<T> buildPublisher(Graph graph) {
        Flowable flowable = null;
        for (Stage stage : graph.getStages()) {
            Operator<Stage> operator = Stages.lookup(stage);
            if (flowable == null) {
                if (operator instanceof PublisherOperator) {
                    flowable = this.createPublisher(stage, (PublisherOperator)operator);
                    continue;
                }
                throw new IllegalArgumentException("Expecting a publisher stage, got a " + stage);
            }
            if (operator instanceof ProcessorOperator) {
                flowable = this.applyProcessors(flowable, stage, (ProcessorOperator)operator);
                continue;
            }
            throw new IllegalArgumentException("Expecting a processor stage, got a " + stage);
        }
        return flowable;
    }

    public <T, R> SubscriberWithCompletionStage<T, R> buildSubscriber(Graph graph) {
        ConnectableProcessor processor = new ConnectableProcessor();
        Flowable flowable = Flowable.fromPublisher(processor);
        for (Stage stage : graph.getStages()) {
            Operator<Stage> operator = Stages.lookup(stage);
            if (operator instanceof ProcessorOperator) {
                flowable = this.applyProcessors(flowable, stage, (ProcessorOperator)operator);
                continue;
            }
            if (operator instanceof TerminalOperator) {
                CompletionStage<R> result = this.applySubscriber(Transformer.apply(flowable), stage, (TerminalOperator)operator);
                return new DefaultSubscriberWithCompletionStage(processor, result);
            }
            throw new UnsupportedStageException(stage);
        }
        throw new IllegalArgumentException("The graph does not have a valid final stage");
    }

    public <T, R> Processor<T, R> buildProcessor(Graph graph) {
        ConnectableProcessor processor = new ConnectableProcessor();
        Flowable flowable = Flowable.fromPublisher(processor);
        for (Stage stage : graph.getStages()) {
            Operator<Stage> operator = Stages.lookup(stage);
            flowable = this.applyProcessors(flowable, stage, (ProcessorOperator)operator);
        }
        return new WrappedProcessor(processor, flowable);
    }

    public <T> CompletionStage<T> buildCompletion(Graph graph) {
        Flowable flowable = null;
        for (Stage stage : graph.getStages()) {
            Operator<Stage> operator = Stages.lookup(stage);
            if (operator instanceof PublisherOperator) {
                flowable = this.createPublisher(stage, (PublisherOperator)operator);
                continue;
            }
            if (operator instanceof ProcessorOperator) {
                flowable = this.applyProcessors(flowable, stage, (ProcessorOperator)operator);
                continue;
            }
            return this.applySubscriber(flowable, stage, (TerminalOperator)operator);
        }
        throw new IllegalArgumentException("Graph did not have terminal stage");
    }

    private <I, O> Flowable<O> applyProcessors(Flowable<I> flowable, Stage stage, ProcessorOperator operator) {
        ProcessingStage ps = operator.create(this, stage);
        return Transformer.apply(ps.apply(flowable));
    }

    private <T, R> CompletionStage<R> applySubscriber(Flowable<T> flowable, Stage stage, TerminalOperator operator) {
        TerminalStage ps = operator.create(this, stage);
        return ps.apply(Transformer.apply(flowable));
    }

    private <O> Flowable<O> createPublisher(Stage stage, PublisherOperator operator) {
        PublisherStage ps = operator.create(this, stage);
        return Transformer.apply(ps.get());
    }
}

