/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.microprofile.messaging;

import io.helidon.microprofile.messaging.IncomingMethod;
import io.helidon.microprofile.messaging.MessageUtils;
import io.helidon.microprofile.messaging.MessagingException;
import java.lang.reflect.Method;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class InternalSubscriber
implements Subscriber<Object> {
    private static final Logger LOGGER = Logger.getLogger(InternalSubscriber.class.getName());
    private Subscription subscription;
    private final IncomingMethod incomingMethod;

    InternalSubscriber(IncomingMethod incomingMethod) {
        this.incomingMethod = incomingMethod;
    }

    public void onSubscribe(Subscription s) {
        this.subscription = s;
        this.subscription.request(Long.MAX_VALUE);
    }

    public void onNext(Object message) {
        Method method = this.incomingMethod.getMethod();
        try {
            Class<?> paramType = method.getParameterTypes()[0];
            Object preProcessedMessage = this.preProcess(message, paramType);
            Object methodResult = method.invoke(this.incomingMethod.getBeanInstance(), preProcessedMessage);
            this.postProcess(message, methodResult);
        }
        catch (Exception e) {
            this.subscription.cancel();
            LOGGER.log(Level.SEVERE, e, () -> "Error when invoking @Incoming method " + method.getName());
        }
    }

    private Object preProcess(Object incomingValue, Class<?> expectedParamType) {
        if (this.incomingMethod.getAckStrategy().equals((Object)Acknowledgment.Strategy.PRE_PROCESSING) && incomingValue instanceof Message) {
            Message incomingMessage = (Message)incomingValue;
            incomingMessage.ack();
        }
        return MessageUtils.unwrap(incomingValue, expectedParamType);
    }

    private void postProcess(Object incomingValue, Object outgoingValue) {
        if (this.incomingMethod.getAckStrategy().equals((Object)Acknowledgment.Strategy.POST_PROCESSING) && incomingValue instanceof Message) {
            Message incomingMessage = (Message)incomingValue;
            incomingMessage.ack();
        }
    }

    public void onError(Throwable t) {
        throw new MessagingException(t);
    }

    public void onComplete() {
    }
}

