/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.microprofile.messaging;

import io.helidon.microprofile.messaging.CompletableQueue;
import io.helidon.microprofile.messaging.MessageUtils;
import io.helidon.microprofile.messaging.ProcessorMethod;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class InternalProcessor
implements Processor<Object, Object> {
    private final ProcessorMethod processorMethod;
    private Subscriber<? super Object> subscriber;
    private final CompletableQueue<Object> completableQueue;

    InternalProcessor(ProcessorMethod processorMethod) {
        this.processorMethod = processorMethod;
        this.completableQueue = CompletableQueue.create();
    }

    public void subscribe(Subscriber<? super Object> s) {
        this.subscriber = s;
    }

    public void onSubscribe(Subscription s) {
        this.subscriber.onSubscribe(s);
        this.completableQueue.onEachComplete((o, throwable) -> {
            if (Objects.isNull(throwable)) {
                Object incomingValue = o.getMetadata();
                Object outgoingValue = o.getValue();
                this.subscriber.onNext(this.postProcess(incomingValue, outgoingValue));
            } else {
                this.subscriber.onError(throwable);
            }
        });
    }

    public void onNext(Object incomingValue) {
        try {
            Method method = this.processorMethod.getMethod();
            Class<?> paramType = method.getParameterTypes()[0];
            Object processedValue = method.invoke(this.processorMethod.getBeanInstance(), this.preProcess(incomingValue, paramType));
            if (processedValue instanceof Publisher || processedValue instanceof PublisherBuilder) {
                PublisherBuilder publisherBuilder = processedValue instanceof Publisher ? ReactiveStreams.fromPublisher((Publisher)((Publisher)processedValue)) : (PublisherBuilder)processedValue;
                publisherBuilder.flatMapCompletionStage(o -> {
                    if (o instanceof CompletionStage) {
                        return (CompletionStage)o;
                    }
                    return CompletableFuture.completedStage(o);
                }).map(o -> this.postProcess(incomingValue, o)).to(this.subscriber).run();
            } else if (!this.completionStageAwait(incomingValue, processedValue)) {
                this.subscriber.onNext(this.postProcess(incomingValue, processedValue));
            }
        }
        catch (Throwable e) {
            this.subscriber.onError(e);
        }
    }

    private Object preProcess(Object incomingValue, Class<?> expectedParamType) {
        if (this.processorMethod.getAckStrategy().equals((Object)Acknowledgment.Strategy.PRE_PROCESSING) && incomingValue instanceof Message) {
            Message incomingMessage = (Message)incomingValue;
            incomingMessage.ack();
        }
        return MessageUtils.unwrap(incomingValue, expectedParamType);
    }

    private boolean completionStageAwait(Object incomingValue, Object outgoingValue) {
        if (outgoingValue instanceof CompletionStage) {
            this.completableQueue.add(((CompletionStage)outgoingValue).toCompletableFuture(), incomingValue);
            return true;
        }
        return false;
    }

    private Object postProcess(Object incomingValue, Object outgoingValue) {
        Message wrappedOutgoing = (Message)MessageUtils.unwrap(outgoingValue, Message.class);
        if (this.processorMethod.getAckStrategy().equals((Object)Acknowledgment.Strategy.POST_PROCESSING)) {
            Message wrappedIncoming = (Message)MessageUtils.unwrap(incomingValue, Message.class);
            wrappedOutgoing = (Message)MessageUtils.unwrap(outgoingValue, Message.class, () -> ((Message)wrappedIncoming).ack());
        }
        return wrappedOutgoing;
    }

    public void onError(Throwable t) {
        this.subscriber.onError(t);
    }

    public void onComplete() {
        this.subscriber.onComplete();
    }
}

