/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.web.rx.integration;

import org.noear.solon.Solon;
import org.noear.solon.core.handle.Action;
import org.noear.solon.core.handle.ActionReturnHandler;
import org.noear.solon.core.handle.Context;
import org.noear.solon.core.util.ClassUtil;
import org.noear.solon.core.util.MimeType;
import org.noear.solon.rx.handle.RxChainManager;
import org.noear.solon.web.rx.integration.ActionRxHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class ActionReturnRxHandler
implements ActionReturnHandler {
    private final boolean hasReactor = ClassUtil.hasClass(() -> Flux.class);
    private RxChainManager<Context> chainManager = (RxChainManager)Solon.context().getBean("RxChainManager<Context>");

    public boolean matched(Context ctx, Class<?> returnType) {
        return Publisher.class.isAssignableFrom(returnType);
    }

    public void returnHandle(Context ctx, Action action, Object result) throws Throwable {
        if (result != null) {
            if (!ctx.asyncSupported()) {
                throw new IllegalStateException("This boot plugin does not support asynchronous mode");
            }
            if (!ctx.asyncStarted()) {
                ctx.asyncStart(-1L, null);
            }
            boolean isStreaming = this.isStreaming(ctx);
            Publisher publisher = this.postPublisher(ctx, action, result, isStreaming);
            ActionRxHandler handler = new ActionRxHandler(action, publisher, isStreaming);
            this.chainManager.doFilter(ctx, handler).doOnError(err -> {
                try {
                    ctx.status(500);
                }
                finally {
                    if (ctx.asyncSupported()) {
                        ctx.asyncComplete();
                    }
                }
            }).doOnComplete(() -> {
                if (ctx.asyncSupported()) {
                    ctx.asyncComplete();
                }
            }).subscribe();
        }
    }

    protected boolean isStreaming(Context ctx) {
        return MimeType.isStreaming((String)ctx.acceptNew());
    }

    protected Publisher postPublisher(Context ctx, Action action, Object result, boolean isStreaming) throws Throwable {
        if (this.hasReactor && result instanceof Flux && !isStreaming) {
            return ((Flux)result).collectList();
        }
        return (Publisher)result;
    }
}

