/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.microprofile.messaging;

import io.helidon.microprofile.messaging.MessageUtils;
import io.helidon.microprofile.messaging.ProcessorMethod;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class ProxyProcessor
implements Processor<Object, Object> {
    private final ProcessorMethod processorMethod;
    private final Publisher<Object> publisher;
    private Subscriber<? super Object> subscriber;
    private Processor<Object, Object> processor;
    private boolean subscribed = false;

    ProxyProcessor(ProcessorMethod processorMethod) {
        this.processorMethod = processorMethod;
        switch (processorMethod.getType()) {
            case PROCESSOR_PUBLISHER_BUILDER_MSG_2_PUBLISHER_BUILDER_MSG: 
            case PROCESSOR_PUBLISHER_BUILDER_PAYL_2_PUBLISHER_BUILDER_PAYL: {
                PublisherBuilder paramPublisherBuilder = ReactiveStreams.fromPublisher((Publisher)this);
                this.publisher = ((PublisherBuilder)processorMethod.invoke(paramPublisherBuilder)).buildRs();
                break;
            }
            case PROCESSOR_PUBLISHER_MSG_2_PUBLISHER_MSG: 
            case PROCESSOR_PUBLISHER_PAYL_2_PUBLISHER_PAYL: {
                this.publisher = (Publisher)processorMethod.invoke(this);
                break;
            }
            case PROCESSOR_PROCESSOR_BUILDER_MSG_2_VOID: {
                this.processor = ((ProcessorBuilder)processorMethod.invoke(new Object[0])).buildRs();
                this.publisher = this.processor;
                break;
            }
            case PROCESSOR_PROCESSOR_BUILDER_PAYL_2_VOID: {
                this.processor = ReactiveStreams.builder().map(MessageUtils::unwrap).via((ProcessorBuilder)processorMethod.invoke(new Object[0])).map(MessageUtils::wrap).buildRs();
                this.publisher = this.processor;
                break;
            }
            case PROCESSOR_PROCESSOR_MSG_2_VOID: {
                this.processor = (Processor)processorMethod.invoke(new Object[0]);
                this.publisher = this.processor;
                break;
            }
            case PROCESSOR_PROCESSOR_PAYL_2_VOID: {
                this.processor = ReactiveStreams.builder().map(MessageUtils::unwrap).via((Processor)processorMethod.invoke(new Object[0])).map(MessageUtils::wrap).buildRs();
                this.publisher = this.processor;
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown signature type " + processorMethod.getType());
            }
        }
    }

    public void subscribe(Subscriber<? super Object> s) {
        if (this.processor != null) {
            this.processor.subscribe(s);
            this.subscriber = this.processor;
        } else if (!this.subscribed && this.publisher != null) {
            this.subscribed = true;
            this.publisher.subscribe(s);
        } else {
            this.subscriber = s;
        }
    }

    public void onSubscribe(Subscription s) {
        this.subscriber.onSubscribe(s);
    }

    public void onNext(Object o) {
        this.preProcess(o);
        this.subscriber.onNext(MessageUtils.unwrap(o, this.processorMethod.getMethod()));
    }

    public void onError(Throwable t) {
        this.subscriber.onError(t);
    }

    public void onComplete() {
        this.subscriber.onComplete();
    }

    private void preProcess(Object incomingValue) {
        if (this.processorMethod.getAckStrategy().equals((Object)Acknowledgment.Strategy.PRE_PROCESSING) && incomingValue instanceof Message) {
            Message incomingMessage = (Message)incomingValue;
            incomingMessage.ack();
        }
    }
}

