/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.simp.stomp;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.ResolvableType;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.messaging.simp.stomp.ConnectionHandlingStompSession;
import org.springframework.messaging.simp.stomp.ConnectionLostException;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompDecoder;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.AlternativeJdkIdGenerator;
import org.springframework.util.Assert;
import org.springframework.util.IdGenerator;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SettableListenableFuture;

public class DefaultStompSession
implements ConnectionHandlingStompSession {
    private static final Log logger = LogFactory.getLog(DefaultStompSession.class);
    private static final IdGenerator idGenerator = new AlternativeJdkIdGenerator();
    public static final byte[] EMPTY_PAYLOAD = new byte[0];
    private static final long HEARTBEAT_MULTIPLIER = 3L;
    private static final Message<byte[]> HEARTBEAT;
    private final String sessionId;
    private final StompSessionHandler sessionHandler;
    private final StompHeaders connectHeaders;
    private final SettableListenableFuture<StompSession> sessionFuture = new SettableListenableFuture();
    private MessageConverter converter = new SimpleMessageConverter();
    private TaskScheduler taskScheduler;
    private long receiptTimeLimit = 15000L;
    private volatile boolean autoReceiptEnabled;
    private volatile TcpConnection<byte[]> connection;
    private volatile String version;
    private final AtomicInteger subscriptionIndex = new AtomicInteger();
    private final Map<String, DefaultSubscription> subscriptions = new ConcurrentHashMap<String, DefaultSubscription>(4);
    private final AtomicInteger receiptIndex = new AtomicInteger();
    private final Map<String, ReceiptHandler> receiptHandlers = new ConcurrentHashMap<String, ReceiptHandler>(4);
    private volatile boolean closing = false;

    public DefaultStompSession(StompSessionHandler sessionHandler, StompHeaders connectHeaders) {
        Assert.notNull((Object)sessionHandler, (String)"'sessionHandler' is required.");
        Assert.notNull((Object)connectHeaders, (String)"'connectHeaders' is required.");
        this.sessionId = idGenerator.generateId().toString();
        this.sessionHandler = sessionHandler;
        this.connectHeaders = connectHeaders;
    }

    @Override
    public String getSessionId() {
        return this.sessionId;
    }

    public StompSessionHandler getSessionHandler() {
        return this.sessionHandler;
    }

    @Override
    public ListenableFuture<StompSession> getSessionFuture() {
        return this.sessionFuture;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"'messageConverter' must not be null");
        this.converter = messageConverter;
    }

    public MessageConverter getMessageConverter() {
        return this.converter;
    }

    public void setTaskScheduler(TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
    }

    public TaskScheduler getTaskScheduler() {
        return this.taskScheduler;
    }

    public void setReceiptTimeLimit(long receiptTimeLimit) {
        Assert.isTrue((receiptTimeLimit > 0L ? 1 : 0) != 0);
        this.receiptTimeLimit = receiptTimeLimit;
    }

    public long getReceiptTimeLimit() {
        return this.receiptTimeLimit;
    }

    @Override
    public void setAutoReceipt(boolean autoReceiptEnabled) {
        this.autoReceiptEnabled = autoReceiptEnabled;
    }

    public boolean isAutoReceiptEnabled() {
        return this.autoReceiptEnabled;
    }

    @Override
    public boolean isConnected() {
        return this.connection != null;
    }

    @Override
    public StompSession.Receiptable send(String destination, Object payload) {
        StompHeaders stompHeaders = new StompHeaders();
        stompHeaders.setDestination(destination);
        return this.send(stompHeaders, payload);
    }

    @Override
    public StompSession.Receiptable send(StompHeaders stompHeaders, Object payload) {
        Assert.hasText((String)stompHeaders.getDestination(), (String)"'destination' header is required");
        String receiptId = this.checkOrAddReceipt(stompHeaders);
        ReceiptHandler receiptable = new ReceiptHandler(receiptId);
        StompHeaderAccessor accessor = this.createHeaderAccessor(StompCommand.SEND);
        accessor.addNativeHeaders(stompHeaders);
        Message<byte[]> message = this.createMessage(accessor, payload);
        this.execute(message);
        return receiptable;
    }

    private String checkOrAddReceipt(StompHeaders stompHeaders) {
        String receiptId = stompHeaders.getReceipt();
        if (this.isAutoReceiptEnabled() && receiptId == null) {
            receiptId = String.valueOf(this.receiptIndex.getAndIncrement());
            stompHeaders.setReceipt(receiptId);
        }
        return receiptId;
    }

    private StompHeaderAccessor createHeaderAccessor(StompCommand command) {
        StompHeaderAccessor accessor = StompHeaderAccessor.create(command);
        accessor.setSessionId(this.sessionId);
        accessor.setLeaveMutable(true);
        return accessor;
    }

    private Message<byte[]> createMessage(StompHeaderAccessor accessor, Object payload) {
        Message<Object> message;
        accessor.updateSimpMessageHeadersFromStompHeaders();
        if (payload == null) {
            message = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
        } else if (payload instanceof byte[]) {
            message = MessageBuilder.createMessage((byte[])payload, accessor.getMessageHeaders());
        } else {
            message = this.getMessageConverter().toMessage(payload, accessor.getMessageHeaders());
            accessor.updateStompHeadersFromSimpMessageHeaders();
            if (message == null) {
                throw new MessageConversionException("Unable to convert payload with type='" + payload.getClass().getName() + "', contentType='" + accessor.getContentType() + "', converter=[" + this.getMessageConverter() + "]");
            }
        }
        return message;
    }

    private void execute(Message<byte[]> message) {
        TcpConnection<byte[]> conn;
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("Sending " + accessor.getDetailedLogMessage(message.getPayload())));
        }
        Assert.state(((conn = this.connection) != null ? 1 : 0) != 0, (String)"Connection closed");
        try {
            conn.send(message).get();
        }
        catch (ExecutionException ex) {
            throw new MessageDeliveryException(message, ex.getCause());
        }
        catch (Throwable ex) {
            throw new MessageDeliveryException(message, ex);
        }
    }

    @Override
    public StompSession.Subscription subscribe(String destination, StompFrameHandler handler) {
        StompHeaders stompHeaders = new StompHeaders();
        stompHeaders.setDestination(destination);
        return this.subscribe(stompHeaders, handler);
    }

    @Override
    public StompSession.Subscription subscribe(StompHeaders stompHeaders, StompFrameHandler handler) {
        String destination = stompHeaders.getDestination();
        Assert.hasText((String)destination, (String)"'destination' is required");
        Assert.notNull((Object)handler, (String)"'handler' is required");
        String subscriptionId = stompHeaders.getId();
        if (!StringUtils.hasText((String)subscriptionId)) {
            subscriptionId = String.valueOf(this.subscriptionIndex.getAndIncrement());
            stompHeaders.setId(subscriptionId);
        }
        String receiptId = this.checkOrAddReceipt(stompHeaders);
        DefaultSubscription subscription = new DefaultSubscription(subscriptionId, destination, receiptId, handler);
        StompHeaderAccessor accessor = this.createHeaderAccessor(StompCommand.SUBSCRIBE);
        accessor.addNativeHeaders(stompHeaders);
        Message<byte[]> message = this.createMessage(accessor, EMPTY_PAYLOAD);
        this.execute(message);
        return subscription;
    }

    @Override
    public StompSession.Receiptable acknowledge(String messageId, boolean consumed) {
        StompHeaders stompHeaders = new StompHeaders();
        if ("1.1".equals(this.version)) {
            stompHeaders.setMessageId(messageId);
        } else {
            stompHeaders.setId(messageId);
        }
        String receiptId = this.checkOrAddReceipt(stompHeaders);
        ReceiptHandler receiptable = new ReceiptHandler(receiptId);
        StompCommand command = consumed ? StompCommand.ACK : StompCommand.NACK;
        StompHeaderAccessor accessor = this.createHeaderAccessor(command);
        accessor.addNativeHeaders(stompHeaders);
        Message<byte[]> message = this.createMessage(accessor, null);
        this.execute(message);
        return receiptable;
    }

    private void unsubscribe(String id) {
        StompHeaderAccessor accessor = this.createHeaderAccessor(StompCommand.UNSUBSCRIBE);
        accessor.setSubscriptionId(id);
        Message<byte[]> message = this.createMessage(accessor, EMPTY_PAYLOAD);
        this.execute(message);
    }

    @Override
    public void disconnect() {
        this.closing = true;
        try {
            StompHeaderAccessor accessor = this.createHeaderAccessor(StompCommand.DISCONNECT);
            Message<byte[]> message = this.createMessage(accessor, EMPTY_PAYLOAD);
            this.execute(message);
        }
        finally {
            this.resetConnection();
        }
    }

    @Override
    public void afterConnected(TcpConnection<byte[]> connection) {
        this.connection = connection;
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Connection established in session id=" + this.sessionId));
        }
        StompHeaderAccessor accessor = this.createHeaderAccessor(StompCommand.CONNECT);
        accessor.addNativeHeaders(this.connectHeaders);
        accessor.setAcceptVersion("1.1,1.2");
        Message<byte[]> message = this.createMessage(accessor, EMPTY_PAYLOAD);
        this.execute(message);
    }

    @Override
    public void afterConnectFailure(Throwable ex) {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Failed to connect session id=" + this.sessionId), ex);
        }
        this.sessionFuture.setException(ex);
        this.sessionHandler.handleTransportError(this, ex);
    }

    @Override
    public void handleMessage(Message<byte[]> message) {
        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        accessor.setSessionId(this.sessionId);
        StompCommand command = accessor.getCommand();
        Map<String, List<String>> nativeHeaders = accessor.getNativeHeaders();
        StompHeaders stompHeaders = StompHeaders.readOnlyStompHeaders(nativeHeaders);
        boolean isHeartbeat = accessor.isHeartbeat();
        if (logger.isTraceEnabled()) {
            logger.trace((Object)("Received " + accessor.getDetailedLogMessage(message.getPayload())));
        }
        try {
            if (StompCommand.MESSAGE.equals((Object)command)) {
                DefaultSubscription subscription = this.subscriptions.get(stompHeaders.getSubscription());
                if (subscription != null) {
                    this.invokeHandler(subscription.getHandler(), message, stompHeaders);
                } else if (logger.isDebugEnabled()) {
                    logger.debug((Object)("No handler for: " + accessor.getDetailedLogMessage(message.getPayload()) + ". Perhaps just unscubscribed?"));
                }
            } else if (StompCommand.RECEIPT.equals((Object)command)) {
                String receiptId = stompHeaders.getReceiptId();
                ReceiptHandler handler = this.receiptHandlers.get(receiptId);
                if (handler != null) {
                    handler.handleReceiptReceived();
                } else if (logger.isDebugEnabled()) {
                    logger.debug((Object)("No matching receipt: " + accessor.getDetailedLogMessage(message.getPayload())));
                }
            } else if (StompCommand.CONNECTED.equals((Object)command)) {
                this.initHeartbeatTasks(stompHeaders);
                this.version = stompHeaders.getFirst("version");
                this.sessionFuture.set((Object)this);
                this.sessionHandler.afterConnected(this, stompHeaders);
            } else if (StompCommand.ERROR.equals((Object)command)) {
                this.invokeHandler(this.sessionHandler, message, stompHeaders);
            } else if (!isHeartbeat && logger.isTraceEnabled()) {
                logger.trace((Object)"Message not handled.");
            }
        }
        catch (Throwable ex) {
            this.sessionHandler.handleException(this, command, stompHeaders, message.getPayload(), ex);
        }
    }

    private void invokeHandler(StompFrameHandler handler, Message<byte[]> message, StompHeaders stompHeaders) {
        if (message.getPayload().length == 0) {
            handler.handleFrame(stompHeaders, null);
            return;
        }
        Type type = handler.getPayloadType(stompHeaders);
        Class payloadType = ResolvableType.forType((Type)type).resolve();
        Object object = this.getMessageConverter().fromMessage(message, payloadType);
        if (object == null) {
            throw new MessageConversionException("No suitable converter, payloadType=" + payloadType + ", handlerType=" + handler.getClass());
        }
        handler.handleFrame(stompHeaders, object);
    }

    private void initHeartbeatTasks(StompHeaders connectedHeaders) {
        long interval;
        long[] connect = this.connectHeaders.getHeartbeat();
        long[] connected = connectedHeaders.getHeartbeat();
        if (connect == null || connected == null) {
            return;
        }
        if (connect[0] > 0L && connected[1] > 0L) {
            interval = Math.max(connect[0], connected[1]);
            this.connection.onWriteInactivity(new WriteInactivityTask(), interval);
        }
        if (connect[1] > 0L && connected[0] > 0L) {
            interval = Math.max(connect[1], connected[0]) * 3L;
            this.connection.onReadInactivity(new ReadInactivityTask(), interval);
        }
    }

    @Override
    public void handleFailure(Throwable ex) {
        block2: {
            try {
                this.sessionFuture.setException(ex);
                this.sessionHandler.handleTransportError(this, ex);
            }
            catch (Throwable ex2) {
                if (!logger.isDebugEnabled()) break block2;
                logger.debug((Object)"Uncaught failure while handling transport failure", ex2);
            }
        }
    }

    @Override
    public void afterConnectionClosed() {
        if (logger.isDebugEnabled()) {
            logger.debug((Object)("Connection closed session id=" + this.sessionId));
        }
        if (!this.closing) {
            this.resetConnection();
            this.handleFailure(new ConnectionLostException("Connection closed"));
        }
    }

    private void resetConnection() {
        TcpConnection<byte[]> conn = this.connection;
        this.connection = null;
        if (conn != null) {
            try {
                conn.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }

    static {
        StompHeaderAccessor accessor = StompHeaderAccessor.createForHeartbeat();
        HEARTBEAT = MessageBuilder.createMessage(StompDecoder.HEARTBEAT_PAYLOAD, accessor.getMessageHeaders());
    }

    private class ReadInactivityTask
    implements Runnable {
        private ReadInactivityTask() {
        }

        @Override
        public void run() {
            DefaultStompSession.this.closing = true;
            String error = "Server has gone quite. Closing connection in session id=" + DefaultStompSession.this.sessionId + ".";
            if (logger.isDebugEnabled()) {
                logger.debug((Object)error);
            }
            DefaultStompSession.this.resetConnection();
            DefaultStompSession.this.handleFailure(new IllegalStateException(error));
        }
    }

    private class WriteInactivityTask
    implements Runnable {
        private WriteInactivityTask() {
        }

        @Override
        public void run() {
            TcpConnection conn = DefaultStompSession.this.connection;
            if (conn != null) {
                conn.send(HEARTBEAT).addCallback((ListenableFutureCallback)new ListenableFutureCallback<Void>(){

                    public void onSuccess(Void result) {
                    }

                    public void onFailure(Throwable ex) {
                        DefaultStompSession.this.handleFailure(ex);
                    }
                });
            }
        }
    }

    private class DefaultSubscription
    extends ReceiptHandler
    implements StompSession.Subscription {
        private final String id;
        private final String destination;
        private final StompFrameHandler handler;

        public DefaultSubscription(String id, String destination, String receiptId, StompFrameHandler handler) {
            super(receiptId);
            Assert.notNull((Object)destination, (String)"'destination' is required");
            Assert.notNull((Object)handler, (String)"'handler' handler is required");
            this.id = id;
            this.destination = destination;
            this.handler = handler;
            DefaultStompSession.this.subscriptions.put(id, this);
        }

        @Override
        public String getSubscriptionId() {
            return this.id;
        }

        public String getDestination() {
            return this.destination;
        }

        public StompFrameHandler getHandler() {
            return this.handler;
        }

        @Override
        public void unsubscribe() {
            DefaultStompSession.this.subscriptions.remove(this.getSubscriptionId());
            DefaultStompSession.this.unsubscribe(this.getSubscriptionId());
        }

        public String toString() {
            return "Subscription [id=" + this.getSubscriptionId() + ", destination='" + this.getDestination() + "', receiptId='" + this.getReceiptId() + "', handler=" + this.getHandler() + "]";
        }
    }

    private class ReceiptHandler
    implements StompSession.Receiptable {
        private final String receiptId;
        private final List<Runnable> receiptCallbacks = new ArrayList<Runnable>(2);
        private final List<Runnable> receiptLostCallbacks = new ArrayList<Runnable>(2);
        private ScheduledFuture<?> future;
        private Boolean result;

        public ReceiptHandler(String receiptId) {
            this.receiptId = receiptId;
            if (this.receiptId != null) {
                this.initReceiptHandling();
            }
        }

        private void initReceiptHandling() {
            Assert.notNull((Object)DefaultStompSession.this.getTaskScheduler(), (String)"To track receipts a TaskScheduler must be configured");
            DefaultStompSession.this.receiptHandlers.put(this.receiptId, this);
            Date startTime = new Date(System.currentTimeMillis() + DefaultStompSession.this.getReceiptTimeLimit());
            this.future = DefaultStompSession.this.getTaskScheduler().schedule(new Runnable(){

                @Override
                public void run() {
                    ReceiptHandler.this.handleReceiptNotReceived();
                }
            }, startTime);
        }

        @Override
        public String getReceiptId() {
            return this.receiptId;
        }

        @Override
        public void addReceiptTask(Runnable task) {
            this.addTask(task, true);
        }

        @Override
        public void addReceiptLostTask(Runnable task) {
            this.addTask(task, false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void addTask(Runnable task, boolean successTask) {
            Assert.notNull((Object)this.receiptId, (String)"To track receipts, set autoReceiptEnabled=true or add 'receiptId' header");
            ReceiptHandler receiptHandler = this;
            synchronized (receiptHandler) {
                if (this.result != null && this.result == successTask) {
                    this.invoke(Arrays.asList(task));
                } else if (successTask) {
                    this.receiptCallbacks.add(task);
                } else {
                    this.receiptLostCallbacks.add(task);
                }
            }
        }

        private void invoke(List<Runnable> callbacks) {
            for (Runnable runnable : callbacks) {
                try {
                    runnable.run();
                }
                catch (Throwable throwable) {}
            }
        }

        public void handleReceiptReceived() {
            this.handleInternal(true);
        }

        public void handleReceiptNotReceived() {
            this.handleInternal(false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void handleInternal(boolean result) {
            ReceiptHandler receiptHandler = this;
            synchronized (receiptHandler) {
                if (this.result != null) {
                    return;
                }
                this.result = result;
                this.invoke(result ? this.receiptCallbacks : this.receiptLostCallbacks);
                DefaultStompSession.this.receiptHandlers.remove(this.receiptId);
                if (this.future != null) {
                    this.future.cancel(true);
                }
            }
        }
    }
}

