/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.appbroker.logging.streaming.endpoint;

import java.net.URI;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.cloudfoundry.dropsonde.events.Envelope;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.appbroker.logging.streaming.endpoint.ServiceInstanceNotFoundException;
import org.springframework.cloud.appbroker.logging.streaming.events.ServiceInstanceLogEvent;
import org.springframework.cloud.appbroker.logging.streaming.events.StartServiceInstanceLoggingEvent;
import org.springframework.cloud.appbroker.logging.streaming.events.StopServiceInstanceLoggingEvent;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationListener;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.util.UriTemplate;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Mono;

public class StreamingLogWebSocketHandler
implements WebSocketHandler,
ApplicationListener<ServiceInstanceLogEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingLogWebSocketHandler.class);
    private static final UriTemplate LOGGING_URI_TEMPLATE = new UriTemplate("/logs/{serviceInstanceId}/stream");
    private final ApplicationEventPublisher eventPublisher;
    private final ConcurrentHashMap<String, EmitterProcessor<Envelope>> processors = new ConcurrentHashMap();

    public StreamingLogWebSocketHandler(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    public Mono<Void> handle(WebSocketSession session) {
        String serviceInstanceId = this.getServiceInstanceId(session);
        LOG.info("Connection established [{}}], service instance {}", (Object)session.getHandshakeInfo().getRemoteAddress(), (Object)serviceInstanceId);
        EmitterProcessor processor = this.processors.computeIfAbsent(serviceInstanceId, s -> EmitterProcessor.create());
        this.eventPublisher.publishEvent((ApplicationEvent)new StartServiceInstanceLoggingEvent(this, serviceInstanceId));
        LOG.info("Published event to start streaming logs for service instance with ID {}", (Object)serviceInstanceId);
        return session.send((Publisher)processor.map(envelope -> session.binaryMessage(dataBufferFactory -> dataBufferFactory.wrap(Envelope.ADAPTER.encode(envelope))))).then().doFinally(signalType -> this.afterConnectionClosed(session)).doOnError(throwable -> LOG.error("Error handling logging stream for service instance " + serviceInstanceId, throwable));
    }

    public void onApplicationEvent(ServiceInstanceLogEvent event) {
        this.broadcastLogMessage(event);
    }

    public void broadcastLogMessage(ServiceInstanceLogEvent event) {
        EmitterProcessor<Envelope> processor;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received event to broadcast log message for " + event.getServiceInstanceId());
        }
        if ((processor = this.processors.get(event.getServiceInstanceId())) == null) {
            if (LOG.isWarnEnabled()) {
                LOG.warn("No processor found for {}, stopping log streaming", (Object)event.getServiceInstanceId());
            }
            this.eventPublisher.publishEvent((ApplicationEvent)new StopServiceInstanceLoggingEvent(this, event.getServiceInstanceId()));
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending message to client for {}", (Object)event.getServiceInstanceId());
        }
        processor.onNext((Object)event.getEnvelope());
    }

    private void afterConnectionClosed(WebSocketSession webSocketSession) {
        if (LOG.isInfoEnabled()) {
            LOG.info("Connection closed [" + webSocketSession.getHandshakeInfo().getRemoteAddress() + "]");
        }
        String serviceInstanceId = this.getServiceInstanceId(webSocketSession);
        this.eventPublisher.publishEvent((ApplicationEvent)new StopServiceInstanceLoggingEvent(this, serviceInstanceId));
        this.processors.computeIfPresent(serviceInstanceId, (s, envelopeEmitterProcessor) -> null);
    }

    private String getServiceInstanceId(WebSocketSession webSocketSession) {
        URI uri = webSocketSession.getHandshakeInfo().getUri();
        Map match = LOGGING_URI_TEMPLATE.match(uri.getPath());
        if (match.isEmpty()) {
            throw new ServiceInstanceNotFoundException();
        }
        return (String)match.get("serviceInstanceId");
    }
}

