/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.smallrye.reactivemessaging.runtime.devmode;

import io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactory;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import jakarta.annotation.Priority;
import jakarta.interceptor.AroundInvoke;
import jakarta.interceptor.Interceptor;
import jakarta.interceptor.InvocationContext;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.function.Supplier;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@Interceptor
@DevModeSupportConnectorFactory
@Priority(value=10)
public class DevModeSupportConnectorFactoryInterceptor {
    private static volatile Supplier<CompletableFuture<Boolean>> onMessage;

    static void register(Supplier<CompletableFuture<Boolean>> onMessage) {
        DevModeSupportConnectorFactoryInterceptor.onMessage = onMessage;
    }

    @AroundInvoke
    public Object intercept(InvocationContext ctx) throws Exception {
        if (onMessage == null) {
            return ctx.proceed();
        }
        if (ctx.getMethod().getName().equals("getPublisherBuilder")) {
            PublisherBuilder result = (PublisherBuilder)ctx.proceed();
            return result.flatMapCompletionStage(msg -> {
                CompletableFuture future = new CompletableFuture();
                onMessage.get().whenComplete((restarted, error) -> {
                    if (!restarted.booleanValue()) {
                        future.complete(msg);
                    }
                });
                return future;
            });
        }
        if (ctx.getMethod().getName().equals("getPublisher")) {
            Flow.Publisher result = (Flow.Publisher)ctx.proceed();
            return Multi.createFrom().publisher(result).onItem().transformToUniAndConcatenate(msg -> Uni.createFrom().emitter(e -> onMessage.get().whenComplete((restarted, error) -> {
                if (!restarted.booleanValue()) {
                    e.complete(msg);
                }
            })));
        }
        if (ctx.getMethod().getName().equals("getSubscriberBuilder")) {
            final SubscriberBuilder result = (SubscriberBuilder)ctx.proceed();
            return ReactiveStreams.fromSubscriber((Subscriber)new Subscriber<Message<?>>(){
                private Subscriber<Message<?>> subscriber;

                public void onSubscribe(Subscription s) {
                    this.subscriber = result.build();
                    this.subscriber.onSubscribe(s);
                }

                public void onNext(Message<?> o) {
                    this.subscriber.onNext(o);
                    onMessage.get();
                }

                public void onError(Throwable t) {
                    this.subscriber.onError(t);
                    onMessage.get();
                }

                public void onComplete() {
                    this.subscriber.onComplete();
                    onMessage.get();
                }
            });
        }
        if (ctx.getMethod().getName().equals("getSubscriber")) {
            final Flow.Subscriber result = (Flow.Subscriber)ctx.proceed();
            return new Flow.Subscriber<Message<?>>(){
                private Flow.Subscriber<Message<?>> subscriber;

                @Override
                public void onSubscribe(Flow.Subscription s) {
                    this.subscriber = result;
                    this.subscriber.onSubscribe(s);
                }

                @Override
                public void onNext(Message<?> o) {
                    this.subscriber.onNext(o);
                    onMessage.get();
                }

                @Override
                public void onError(Throwable t) {
                    this.subscriber.onError(t);
                    onMessage.get();
                }

                @Override
                public void onComplete() {
                    this.subscriber.onComplete();
                    onMessage.get();
                }
            };
        }
        return ctx.proceed();
    }
}

