/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.app.gpfdist.sink;

import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.springframework.cloud.stream.app.gpfdist.sink.GpfdistCodec;
import reactor.core.processor.RingBufferWorkProcessor;
import reactor.fn.BiFunction;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.NetStreams;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.Spec;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.HttpServer;
import reactor.rx.Stream;
import reactor.rx.Streams;

public class GpfdistServer {
    private static final Log log = LogFactory.getLog(GpfdistServer.class);
    private final Processor<Buffer, Buffer> processor;
    private final int port;
    private final int flushCount;
    private final int flushTime;
    private final int batchTimeout;
    private final int batchCount;
    private HttpServer<Buffer, Buffer> server;
    private int localPort = -1;

    public GpfdistServer(Processor<Buffer, Buffer> processor, int port, int flushCount, int flushTime, int batchTimeout, int batchCount) {
        this.processor = processor;
        this.port = port;
        this.flushCount = flushCount;
        this.flushTime = flushTime;
        this.batchTimeout = batchTimeout;
        this.batchCount = batchCount;
    }

    public synchronized HttpServer<Buffer, Buffer> start() throws Exception {
        if (this.server == null) {
            this.server = this.createProtocolListener();
        }
        return this.server;
    }

    public synchronized void stop() throws Exception {
        if (this.server != null) {
            this.server.shutdown().awaitSuccess();
        }
        this.server = null;
    }

    public int getLocalPort() {
        return this.localPort;
    }

    private HttpServer<Buffer, Buffer> createProtocolListener() throws Exception {
        final Stream stream = Streams.wrap(this.processor).window(this.flushCount, (long)this.flushTime, TimeUnit.SECONDS).flatMap((Function)new Function<Stream<Buffer>, Publisher<Buffer>>(){

            public Publisher<Buffer> apply(Stream<Buffer> t) {
                return t.reduce((Object)new Buffer(), (BiFunction)new BiFunction<Buffer, Buffer, Buffer>(){

                    public Buffer apply(Buffer prev, Buffer next) {
                        return prev.append(new Buffer[]{next});
                    }
                });
            }
        }).process((Processor)RingBufferWorkProcessor.create((String)"gpfdist-sink-worker", (int)8192, (boolean)false));
        HttpServer httpServer = NetStreams.httpServer((Function)new Function<Spec.HttpServerSpec<Buffer, Buffer>, Spec.HttpServerSpec<Buffer, Buffer>>(){

            public Spec.HttpServerSpec<Buffer, Buffer> apply(Spec.HttpServerSpec<Buffer, Buffer> server) {
                return (Spec.HttpServerSpec)((Spec.HttpServerSpec)server.codec((Codec)new GpfdistCodec())).listen(GpfdistServer.this.port);
            }
        });
        httpServer.get("/data", (ReactorChannelHandler)new ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>>(){

            public Publisher<Void> apply(HttpChannel<Buffer, Buffer> request) {
                request.responseHeaders().removeTransferEncodingChunked();
                request.addResponseHeader("Content-type", "text/plain");
                request.addResponseHeader("Expires", "0");
                request.addResponseHeader("X-GPFDIST-VERSION", "Spring Dataflow");
                request.addResponseHeader("X-GP-PROTO", "1");
                request.addResponseHeader("Cache-Control", "no-cache");
                request.addResponseHeader("Connection", "close");
                return request.writeWith((Publisher)stream.take((long)GpfdistServer.this.batchCount).timeout((long)GpfdistServer.this.batchTimeout, TimeUnit.SECONDS, (Publisher)Streams.empty()).concatWith((Publisher)Streams.just((Object)Buffer.wrap((byte[])new byte[0])))).capacity(1L);
            }
        });
        httpServer.start().awaitSuccess();
        log.info((Object)("Server running using address=[" + httpServer.getListenAddress() + "]"));
        this.localPort = httpServer.getListenAddress().getPort();
        return httpServer;
    }
}

