/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.appbroker.logging.streaming.endpoint;

import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.cloudfoundry.dropsonde.events.Envelope;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.appbroker.logging.streaming.endpoint.ServiceInstanceNotFoundException;
import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent;
import org.springframework.cloud.appbroker.logging.streaming.events.StartServiceInstanceLoggingEvent;
import org.springframework.cloud.appbroker.logging.streaming.events.StopServiceInstanceLoggingEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.util.UriTemplate;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class StreamingLogWebSocketHandler
implements WebSocketHandler,
ApplicationListener<ServiceInstanceLogEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingLogWebSocketHandler.class);
    private static final UriTemplate LOGGING_URI_TEMPLATE = new UriTemplate("/logs/{serviceInstanceId}/stream");
    private final ApplicationEventPublisher eventPublisher;
    private final ConcurrentMap<String, Sinks.Many<Envelope>> envelopeSinks = new ConcurrentHashMap<String, Sinks.Many<Envelope>>();

    public StreamingLogWebSocketHandler(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    public Mono<Void> handle(WebSocketSession session) {
        String serviceInstanceId = this.getServiceInstanceId(session);
        LOG.info("Connection established [{}], service instance {}", (Object)session.getHandshakeInfo().getRemoteAddress(), (Object)serviceInstanceId);
        Sinks.Many envelopeSink = this.envelopeSinks.computeIfAbsent(serviceInstanceId, s -> Sinks.many().multicast().onBackpressureBuffer());
        this.eventPublisher.publishEvent((ApplicationEvent)new StartServiceInstanceLoggingEvent(this, serviceInstanceId));
        LOG.info("Published event to start streaming logs for service instance with ID {}", (Object)serviceInstanceId);
        return session.send((Publisher)envelopeSink.asFlux().map(envelope -> session.binaryMessage(dataBufferFactory -> dataBufferFactory.wrap(Envelope.ADAPTER.encode(envelope))))).doFinally(signalType -> this.afterConnectionClosed(session, serviceInstanceId)).doOnError(throwable -> LOG.error(String.format("Error handling logging stream for service instance %s", serviceInstanceId), throwable));
    }

    public void onApplicationEvent(ServiceInstanceLogEvent event) {
        this.broadcastLogMessage(event);
    }

    public void broadcastLogMessage(ServiceInstanceLogEvent event) {
        Sinks.Many envelopeSink;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received event to broadcast log message for {}", (Object)event.getServiceInstanceId());
        }
        if ((envelopeSink = (Sinks.Many)this.envelopeSinks.get(event.getServiceInstanceId())) == null) {
            if (LOG.isWarnEnabled()) {
                LOG.warn("No sink found for {}, stopping log streaming", (Object)event.getServiceInstanceId());
            }
            this.eventPublisher.publishEvent((ApplicationEvent)new StopServiceInstanceLoggingEvent(this, event.getServiceInstanceId()));
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending message to client for {}", (Object)event.getServiceInstanceId());
        }
        envelopeSink.tryEmitNext((Object)event.getEnvelope()).orThrow();
    }

    private void afterConnectionClosed(WebSocketSession webSocketSession, String serviceInstanceId) {
        LOG.info("Connection closed [{}], service instance {}", (Object)webSocketSession.getHandshakeInfo().getRemoteAddress(), (Object)serviceInstanceId);
        this.eventPublisher.publishEvent((ApplicationEvent)new StopServiceInstanceLoggingEvent(this, serviceInstanceId));
        Sinks.Many sink = (Sinks.Many)this.envelopeSinks.remove(serviceInstanceId);
        if (sink != null) {
            sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
        }
    }

    private String getServiceInstanceId(WebSocketSession webSocketSession) {
        URI uri = webSocketSession.getHandshakeInfo().getUri();
        Map match = LOGGING_URI_TEMPLATE.match(uri.getPath());
        if (match.isEmpty()) {
            throw new ServiceInstanceNotFoundException();
        }
        return (String)match.get("serviceInstanceId");
    }
}

