/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.simp.stomp;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import reactor.core.Environment;
import reactor.core.composable.Composable;
import reactor.core.composable.Deferred;
import reactor.core.composable.Promise;
import reactor.core.composable.spec.DeferredPromiseSpec;
import reactor.function.Consumer;
import reactor.tcp.Reconnect;
import reactor.tcp.TcpClient;
import reactor.tcp.TcpConnection;
import reactor.tcp.encoding.Codec;
import reactor.tcp.encoding.DelimitedCodec;
import reactor.tcp.encoding.StandardCodecs;
import reactor.tcp.netty.NettyTcpClient;
import reactor.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;

public class StompBrokerRelayMessageHandler
extends AbstractBrokerMessageHandler {
    private final MessageChannel messageChannel;
    private String relayHost = "127.0.0.1";
    private int relayPort = 61613;
    private String systemLogin = "guest";
    private String systemPasscode = "guest";
    private final StompMessageConverter stompMessageConverter = new StompMessageConverter();
    private Environment environment;
    private TcpClient<String, String> tcpClient;
    private final Map<String, RelaySession> relaySessions = new ConcurrentHashMap<String, RelaySession>();

    public StompBrokerRelayMessageHandler(MessageChannel messageChannel, Collection<String> destinationPrefixes) {
        super(destinationPrefixes);
        Assert.notNull((Object)messageChannel, (String)"messageChannel is required");
        this.messageChannel = messageChannel;
    }

    public void setRelayHost(String relayHost) {
        Assert.hasText((String)relayHost, (String)"relayHost must not be empty");
        this.relayHost = relayHost;
    }

    public String getRelayHost() {
        return this.relayHost;
    }

    public void setRelayPort(int relayPort) {
        this.relayPort = relayPort;
    }

    public int getRelayPort() {
        return this.relayPort;
    }

    public void setSystemLogin(String systemLogin) {
        Assert.hasText((String)systemLogin, (String)"systemLogin must not be empty");
        this.systemLogin = systemLogin;
    }

    public String getSystemLogin() {
        return this.systemLogin;
    }

    public void setSystemPasscode(String systemPasscode) {
        this.systemPasscode = systemPasscode;
    }

    public String getSystemPasscode() {
        return this.systemPasscode;
    }

    @Override
    protected void startInternal() {
        this.environment = new Environment();
        this.tcpClient = (TcpClient)((TcpClientSpec)new TcpClientSpec(NettyTcpClient.class).env(this.environment)).codec((Codec)new DelimitedCodec(0, true, (Codec)StandardCodecs.STRING_CODEC)).connect(this.relayHost, this.relayPort).get();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug((Object)"Initializing \"system\" TCP connection");
        }
        SystemRelaySession session = new SystemRelaySession();
        this.relaySessions.put(session.getId(), session);
        session.connect();
    }

    @Override
    protected void stopInternal() {
        try {
            this.tcpClient.close().await();
        }
        catch (Throwable t) {
            this.logger.error((Object)"Failed to close reactor TCP client", t);
        }
        try {
            this.environment.shutdown();
        }
        catch (Throwable t) {
            this.logger.error((Object)"Failed to shut down reactor Environment", t);
        }
    }

    @Override
    protected void handleMessageInternal(Message<?> message) {
        StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
        String sessionId = headers.getSessionId();
        String destination = headers.getDestination();
        StompCommand command = headers.getCommand();
        SimpMessageType messageType = headers.getMessageType();
        if (SimpMessageType.MESSAGE.equals((Object)messageType)) {
            sessionId = sessionId == null ? "stompRelaySystemSessionId" : sessionId;
            headers.setSessionId(sessionId);
            command = command == null ? StompCommand.SEND : command;
            headers.setCommandIfNotSet(command);
            message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
        }
        if (headers.getCommand() == null) {
            this.logger.error((Object)("No STOMP command, ignoring message: " + message));
            return;
        }
        if (sessionId == null) {
            this.logger.error((Object)("No sessionId, ignoring message: " + message));
            return;
        }
        if (command.requiresDestination() && !this.checkDestinationPrefix(destination)) {
            return;
        }
        try {
            if (SimpMessageType.CONNECT.equals((Object)messageType)) {
                headers.setHeartbeat(0L, 0L);
                message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
                RelaySession session = new RelaySession(sessionId);
                this.relaySessions.put(sessionId, session);
                session.connect(message);
            } else if (SimpMessageType.DISCONNECT.equals((Object)messageType)) {
                RelaySession session = this.relaySessions.remove(sessionId);
                if (session == null) {
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace((Object)("Session already removed, sessionId=" + sessionId));
                    }
                    return;
                }
                session.forward(message);
            } else {
                RelaySession session = this.relaySessions.get(sessionId);
                if (session == null) {
                    this.logger.warn((Object)("Session id=" + sessionId + " not found. Ignoring message: " + message));
                    return;
                }
                session.forward(message);
            }
        }
        catch (Throwable t) {
            this.logger.error((Object)("Failed to handle message " + message), t);
        }
    }

    private class SystemRelaySession
    extends RelaySession {
        public static final String ID = "stompRelaySystemSessionId";

        public SystemRelaySession() {
            super(ID);
        }

        public void connect() {
            StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
            headers.setAcceptVersion("1.1,1.2");
            headers.setLogin(StompBrokerRelayMessageHandler.this.systemLogin);
            headers.setPasscode(StompBrokerRelayMessageHandler.this.systemPasscode);
            headers.setHeartbeat(0L, 0L);
            Message<byte[]> connectMessage = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
            super.connect(connectMessage);
        }

        @Override
        protected Composable<TcpConnection<String, String>> openTcpConnection() {
            return StompBrokerRelayMessageHandler.this.tcpClient.open(new Reconnect(){

                public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
                    return Tuple.of((Object)address, (Object)5000L);
                }
            });
        }

        @Override
        protected void sendMessageToClient(Message<?> message) {
            StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
            if (StompCommand.ERROR.equals((Object)headers.getCommand()) && StompBrokerRelayMessageHandler.this.logger.isErrorEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.error((Object)("System session received ERROR frame from broker: " + message));
            }
        }
    }

    private static class StompConnection {
        private volatile TcpConnection<String, String> connection;
        private AtomicReference<TcpConnection<String, String>> readyConnection = new AtomicReference();

        private StompConnection() {
        }

        public void setTcpConnection(TcpConnection<String, String> connection) {
            Assert.notNull(connection, (String)"connection must not be null");
            this.connection = connection;
        }

        public TcpConnection<String, String> getReadyConnection() {
            return this.readyConnection.get();
        }

        public void setReady() {
            this.readyConnection.set(this.connection);
        }

        public boolean isReady() {
            return this.readyConnection.get() != null;
        }

        public void setDisconnected() {
            this.readyConnection.set(null);
            this.connection = null;
        }

        public String toString() {
            return "StompConnection [ready=" + this.isReady() + "]";
        }
    }

    private class RelaySession {
        private final String sessionId;
        private final BlockingQueue<Message<?>> messageQueue = new LinkedBlockingQueue(50);
        private volatile StompConnection stompConnection = new StompConnection();
        private final Object monitor = new Object();

        private RelaySession(String sessionId) {
            Assert.notNull((Object)sessionId, (String)"sessionId is required");
            this.sessionId = sessionId;
        }

        public String getId() {
            return this.sessionId;
        }

        public void connect(final Message<?> connectMessage) {
            Assert.notNull(connectMessage, (String)"connectMessage is required");
            Composable<TcpConnection<String, String>> connectionComposable = this.openTcpConnection();
            connectionComposable.consume((Consumer)new Consumer<TcpConnection<String, String>>(){

                public void accept(TcpConnection<String, String> connection) {
                    RelaySession.this.handleTcpConnection(connection, connectMessage);
                }
            });
            connectionComposable.when(Throwable.class, (Consumer)new Consumer<Throwable>(){

                public void accept(Throwable ex) {
                    StompBrokerRelayMessageHandler.this.relaySessions.remove(RelaySession.this.sessionId);
                    RelaySession.this.handleTcpClientFailure("Failed to connect to message broker", ex);
                }
            });
        }

        protected Composable<TcpConnection<String, String>> openTcpConnection() {
            return StompBrokerRelayMessageHandler.this.tcpClient.open();
        }

        protected void handleTcpConnection(TcpConnection<String, String> tcpConn, Message<?> connectMessage) {
            this.stompConnection.setTcpConnection(tcpConn);
            tcpConn.in().consume((Consumer)new Consumer<String>(){

                public void accept(String message) {
                    RelaySession.this.readStompFrame(message);
                }
            });
            this.forwardInternal(tcpConn, connectMessage);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void readStompFrame(String stompFrame) {
            StompHeaderAccessor headers;
            if (StringUtils.isEmpty((Object)stompFrame)) {
                return;
            }
            Message<?> message = StompBrokerRelayMessageHandler.this.stompMessageConverter.toMessage(stompFrame);
            if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.trace((Object)("Reading message " + message));
            }
            if (StompCommand.CONNECTED == (headers = StompHeaderAccessor.wrap(message)).getCommand()) {
                Object object = this.monitor;
                synchronized (object) {
                    this.stompConnection.setReady();
                    StompBrokerRelayMessageHandler.this.publishBrokerAvailableEvent();
                    this.flushMessages();
                }
                return;
            }
            headers.setSessionId(this.sessionId);
            message = MessageBuilder.withPayloadAndHeaders(message.getPayload(), headers).build();
            this.sendMessageToClient(message);
        }

        private void handleTcpClientFailure(String message, Throwable ex) {
            if (StompBrokerRelayMessageHandler.this.logger.isErrorEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.error((Object)(message + ", sessionId=" + this.sessionId), ex);
            }
            this.stompConnection.setDisconnected();
            this.sendError(message);
            StompBrokerRelayMessageHandler.this.publishBrokerUnavailableEvent();
        }

        private void sendError(String errorText) {
            StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR);
            headers.setSessionId(this.sessionId);
            headers.setMessage(errorText);
            Message<byte[]> errorMessage = MessageBuilder.withPayloadAndHeaders(new byte[0], headers).build();
            this.sendMessageToClient(errorMessage);
        }

        protected void sendMessageToClient(Message<?> message) {
            StompBrokerRelayMessageHandler.this.messageChannel.send(message);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void forward(Message<?> message) {
            if (!this.stompConnection.isReady()) {
                Object object = this.monitor;
                synchronized (object) {
                    if (!this.stompConnection.isReady()) {
                        this.messageQueue.add(message);
                        if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                            StompBrokerRelayMessageHandler.this.logger.trace((Object)("Not connected, message queued. Queue size=" + this.messageQueue.size()));
                        }
                        return;
                    }
                }
            }
            if (this.messageQueue.isEmpty()) {
                this.forwardInternal(message);
            } else {
                this.messageQueue.add(message);
                this.flushMessages();
            }
        }

        private boolean forwardInternal(Message<?> message) {
            TcpConnection<String, String> tcpConnection = this.stompConnection.getReadyConnection();
            if (tcpConnection == null) {
                return false;
            }
            return this.forwardInternal(tcpConnection, message);
        }

        private boolean forwardInternal(TcpConnection<String, String> tcpConnection, Message<?> message) {
            if (StompBrokerRelayMessageHandler.this.logger.isTraceEnabled()) {
                StompBrokerRelayMessageHandler.this.logger.trace((Object)("Forwarding to STOMP broker, message: " + message));
            }
            byte[] bytes = StompBrokerRelayMessageHandler.this.stompMessageConverter.fromMessage(message);
            String payload = new String(bytes, Charset.forName("UTF-8"));
            final Deferred deferred = (Deferred)new DeferredPromiseSpec().get();
            tcpConnection.send((Object)payload, (Consumer)new Consumer<Boolean>(){

                public void accept(Boolean success) {
                    deferred.accept((Object)success);
                }
            });
            Boolean success = null;
            try {
                success = (Boolean)((Promise)deferred.compose()).await();
                if (success == null) {
                    this.handleTcpClientFailure("Timed out waiting for message to be forwarded to the broker", null);
                } else if (!success.booleanValue() && StompHeaderAccessor.wrap(message).getCommand() != StompCommand.DISCONNECT) {
                    this.handleTcpClientFailure("Failed to forward message to the broker", null);
                }
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                this.handleTcpClientFailure("Interrupted while forwarding message to the broker", ex);
            }
            return success != null ? success : false;
        }

        private void flushMessages() {
            ArrayList messages = new ArrayList();
            this.messageQueue.drainTo(messages);
            for (Message message : messages) {
                if (this.forwardInternal(message)) continue;
                return;
            }
        }
    }
}

