/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.app.websocket.sink;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.security.cert.CertificateException;
import java.util.LinkedHashMap;
import javax.annotation.PostConstruct;
import javax.net.ssl.SSLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.trace.InMemoryTraceRepository;
import org.springframework.boot.actuate.trace.TraceRepository;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.app.websocket.sink.WebsocketSinkProperties;
import org.springframework.cloud.stream.app.websocket.sink.WebsocketSinkServer;
import org.springframework.cloud.stream.app.websocket.sink.WebsocketSinkServerInitializer;
import org.springframework.cloud.stream.app.websocket.sink.actuator.WebsocketSinkTraceEndpoint;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;

@Configuration
@EnableConfigurationProperties(value={WebsocketSinkProperties.class})
@EnableBinding(value={Sink.class})
public class WebsocketSinkConfiguration {
    private static final Log logger = LogFactory.getLog(WebsocketSinkConfiguration.class);
    @Value(value="${endpoints.websocketsinktrace.enabled:false}")
    private boolean traceEndpointEnabled;
    private final TraceRepository websocketTraceRepository = new InMemoryTraceRepository();

    @PostConstruct
    public void init() throws InterruptedException, CertificateException, SSLException {
        this.server().run();
    }

    @Bean
    public WebsocketSinkServer server() {
        return new WebsocketSinkServer();
    }

    @Bean
    public WebsocketSinkServerInitializer initializer() {
        return new WebsocketSinkServerInitializer(this.websocketTraceRepository);
    }

    @Bean
    @ConditionalOnProperty(value={"endpoints.websocketsinktrace.enabled"}, havingValue="true")
    public WebsocketSinkTraceEndpoint websocketTraceEndpoint() {
        return new WebsocketSinkTraceEndpoint(this.websocketTraceRepository);
    }

    @ServiceActivator(inputChannel="input")
    public void websocketSink(Message<?> message) {
        if (logger.isTraceEnabled()) {
            logger.trace((Object)String.format("Handling message: %s", message));
        }
        SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
        headers.setMessageTypeIfNotSet(SimpMessageType.MESSAGE);
        String messagePayload = message.getPayload().toString();
        for (Channel channel : WebsocketSinkServer.channels) {
            if (logger.isTraceEnabled()) {
                logger.trace((Object)String.format("Writing message %s to channel %s", messagePayload, channel.localAddress()));
            }
            channel.write((Object)new TextWebSocketFrame(messagePayload));
            channel.flush();
        }
        if (this.traceEndpointEnabled) {
            this.addMessageToTraceRepository(message);
        }
    }

    private void addMessageToTraceRepository(Message<?> message) {
        LinkedHashMap<String, Object> trace = new LinkedHashMap<String, Object>();
        trace.put("type", "text");
        trace.put("direction", "out");
        trace.put("id", message.getHeaders().getId());
        trace.put("payload", message.getPayload().toString());
        this.websocketTraceRepository.add(trace);
    }
}

