/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.netflix.turbine.stream;

import com.netflix.turbine.aggregator.InstanceKey;
import com.netflix.turbine.aggregator.StreamAggregator;
import com.netflix.turbine.internal.JsonUtility;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.server.HttpServer;
import io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.actuator.HasFeatures;
import org.springframework.cloud.netflix.turbine.stream.TurbineStreamProperties;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.SocketUtils;
import rx.Observable;
import rx.subjects.PublishSubject;

@Configuration
@EnableConfigurationProperties(value={TurbineStreamProperties.class})
public class TurbineStreamConfiguration
implements SmartLifecycle {
    private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);
    private AtomicBoolean running = new AtomicBoolean(false);
    @Autowired
    private TurbineStreamProperties properties;
    private int turbinePort;

    @Bean
    public HasFeatures Feature() {
        return HasFeatures.namedFeature((String)"Turbine (Stream)", TurbineStreamProperties.class);
    }

    @Bean
    public PublishSubject<Map<String, Object>> hystrixSubject() {
        return PublishSubject.create();
    }

    @Bean
    public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
        Observable publishedStreams = StreamAggregator.aggregateGroupedStreams((Observable)this.hystrixSubject().groupBy(data -> InstanceKey.create((String)((String)data.get("instanceId"))))).doOnUnsubscribe(() -> log.info((Object)"Unsubscribing aggregation.")).doOnSubscribe(() -> log.info((Object)"Starting aggregation")).flatMap(o -> o).publish().refCount();
        Observable ping = Observable.timer((long)1L, (long)10L, (TimeUnit)TimeUnit.SECONDS).map(count -> Collections.singletonMap("type", "Ping")).publish().refCount();
        Observable output = Observable.merge((Observable)publishedStreams, (Observable)ping);
        this.turbinePort = this.properties.getPort();
        if (this.turbinePort <= 0) {
            this.turbinePort = SocketUtils.findAvailableTcpPort((int)40000);
        }
        HttpServer httpServer = RxNetty.createHttpServer((int)this.turbinePort, (request, response) -> {
            log.info((Object)"SSE Request Received");
            response.getHeaders().setHeader("Content-Type", (Object)"text/event-stream");
            return output.doOnUnsubscribe(() -> log.info((Object)"Unsubscribing RxNetty server connection")).flatMap(data -> response.writeAndFlush((Object)new ServerSentEvent(Unpooled.copiedBuffer((CharSequence)JsonUtility.mapToJson((Map)data), (Charset)StandardCharsets.UTF_8))));
        }, (PipelineConfigurator)PipelineConfigurators.serveSseConfigurator());
        return httpServer;
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public void start() {
        if (this.running.compareAndSet(false, true)) {
            this.aggregatorServer().start();
        }
    }

    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            try {
                this.aggregatorServer().shutdown();
            }
            catch (InterruptedException ex) {
                log.error((Object)"Error shutting down", (Throwable)ex);
            }
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public int getPhase() {
        return 0;
    }

    public int getTurbinePort() {
        return this.turbinePort;
    }
}

