/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.web.rx.integration;

import java.util.Collection;
import org.noear.solon.Utils;
import org.noear.solon.core.handle.Context;
import org.noear.solon.core.handle.Entity;
import org.noear.solon.core.util.KeyValues;
import org.noear.solon.rx.Completable;
import org.noear.solon.rx.handle.RxContext;
import org.noear.solon.rx.handle.RxHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class RxHandlerImpl
implements RxHandler {
    private static final byte[] CRLF = "\n".getBytes();
    private Flux publisher;
    private boolean isStreaming;

    public RxHandlerImpl(Publisher publisher, boolean isStreaming) {
        this.publisher = publisher instanceof Flux ? (Flux)publisher : Flux.from((Publisher)publisher);
        this.isStreaming = isStreaming;
    }

    @Override
    public Completable handle(RxContext rxCtx) {
        return Completable.create(emitter -> {
            Context ctx = rxCtx.toContext();
            if (!ctx.asyncStarted()) {
                ctx.asyncStart();
            }
            Flux.from((Publisher)this.publisher).concatMap(o -> {
                if (o == null) {
                    return Flux.empty();
                }
                if (o instanceof Entity) {
                    return this.getEntityBody(ctx, (Entity)o);
                }
                if (o instanceof Publisher) {
                    return Flux.from((Publisher)((Publisher)o));
                }
                return Flux.just((Object)o);
            }).doOnNext(o -> {
                try {
                    ctx.render(o);
                    if (this.isStreaming) {
                        ctx.output(CRLF);
                        ctx.flush();
                    }
                }
                catch (Throwable e) {
                    throw new RuntimeException(e);
                }
            }).doOnError(err -> emitter.onError((Throwable)err)).doOnComplete(() -> emitter.onComplete()).subscribe();
        });
    }

    private Flux<Object> getEntityBody(Context ctx, Entity entity) {
        Object data = entity.body();
        if (!ctx.isHeadersSent()) {
            if (entity.status() > 0) {
                ctx.status(entity.status());
            }
            if (!entity.headers().isEmpty()) {
                for (KeyValues kv : entity.headers()) {
                    if (!Utils.isNotEmpty((Collection)kv.getValues())) continue;
                    if (kv.getValues().size() > 1) {
                        for (String val : kv.getValues()) {
                            ctx.headerAdd(kv.getKey(), val);
                        }
                        continue;
                    }
                    ctx.headerSet(kv.getKey(), (String)kv.getFirstValue());
                }
            }
        }
        if (data == null) {
            return Flux.empty();
        }
        if (data instanceof Publisher) {
            return Flux.from((Publisher)((Publisher)data));
        }
        return Flux.just((Object)data);
    }
}

