/*
 * Decompiled with CFR 0.152.
 */
package com.caucho.message.stomp;

import com.caucho.message.DistributionMode;
import com.caucho.message.broker.BrokerReceiver;
import com.caucho.message.broker.BrokerSender;
import com.caucho.message.broker.MessageBroker;
import com.caucho.message.broker.ReceiverMessageHandler;
import com.caucho.message.broker.SenderSettleHandler;
import com.caucho.message.stomp.NullSender;
import com.caucho.message.stomp.StompAbortCommand;
import com.caucho.message.stomp.StompAckCommand;
import com.caucho.message.stomp.StompBeginCommand;
import com.caucho.message.stomp.StompCommand;
import com.caucho.message.stomp.StompCommitCommand;
import com.caucho.message.stomp.StompConnectCommand;
import com.caucho.message.stomp.StompDisconnectCommand;
import com.caucho.message.stomp.StompNackCommand;
import com.caucho.message.stomp.StompProtocol;
import com.caucho.message.stomp.StompSendCommand;
import com.caucho.message.stomp.StompSubscribeCommand;
import com.caucho.message.stomp.StompUnsubscribeCommand;
import com.caucho.message.stomp.StompXaItem;
import com.caucho.network.listen.AbstractProtocolConnection;
import com.caucho.network.listen.SocketLink;
import com.caucho.util.CharBuffer;
import com.caucho.vfs.ReadStream;
import com.caucho.vfs.WriteStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

public class StompConnection
extends AbstractProtocolConnection {
    private static final Logger log = Logger.getLogger(StompConnection.class.getName());
    private static final HashMap<CharBuffer, StompCommand> _commandMap = new HashMap();
    private static final CharBuffer CONTENT_LENGTH = new CharBuffer("content-length");
    private static final CharBuffer CONTENT_TYPE = new CharBuffer("content-type");
    private static final CharBuffer DESTINATION = new CharBuffer("destination");
    private static final CharBuffer ID = new CharBuffer("id");
    private static final CharBuffer MESSAGE_ID = new CharBuffer("message-id");
    private static final CharBuffer PERSISTENT = new CharBuffer("persistent");
    private static final CharBuffer RECEIPT = new CharBuffer("receipt");
    private static final CharBuffer SUBSCRIPTION = new CharBuffer("subscription");
    private static final CharBuffer TRANSACTION = new CharBuffer("transaction");
    private static final BrokerSender NULL_DESTINATION = new NullSender();
    private StompProtocol _stomp;
    private SocketLink _link;
    private HashMap<String, BrokerSender> _destinationMap = new HashMap();
    private HashMap<String, BrokerReceiver> _subscriptionMap = new HashMap();
    private CharBuffer _method = new CharBuffer();
    private char[] _headerBuffer = new char[4096];
    private int _headerOffset;
    private String _destinationName;
    private long _contentLength;
    private String _contentType;
    private String _id;
    private long _messageId;
    private String _receipt;
    private String _subscription;
    private String _transaction;
    private long _xid;
    private long _sessionId;
    private ArrayList<StompXaItem> _xaList;

    StompConnection(StompProtocol stomp, SocketLink link) {
        this._stomp = stomp;
        this._link = link;
    }

    @Override
    public String getProtocolRequestURL() {
        return "stomp:";
    }

    @Override
    public void init() {
    }

    SocketLink getLink() {
        return this._link;
    }

    ReadStream getReadStream() {
        return this._link.getReadStream();
    }

    WriteStream getWriteStream() {
        return this._link.getWriteStream();
    }

    public long getSessionId() {
        return this._sessionId;
    }

    public long getContentLength() {
        return this._contentLength;
    }

    public long getMessageId() {
        return this._messageId;
    }

    public String getSubscription() {
        return this._subscription;
    }

    public String getContentType() {
        return this._contentType;
    }

    public BrokerSender getDestination() {
        if (this._destinationName == null) {
            return null;
        }
        BrokerSender dest = this._destinationMap.get(this._destinationName);
        if (dest == null) {
            dest = this._stomp.createDestination(this._destinationName);
            if (dest != null) {
                this._destinationMap.put(this._destinationName, dest);
            } else {
                dest = NULL_DESTINATION;
            }
        }
        return dest;
    }

    public String getId() {
        return this._id;
    }

    public long getXid() {
        return this._xid;
    }

    public String getReceipt() {
        return this._receipt;
    }

    public SenderSettleHandler createReceiptCallback() {
        if (this._receipt != null) {
            return new ReceiptListener(this, this._receipt);
        }
        return null;
    }

    public String getTransaction() {
        return this._transaction;
    }

    public boolean subscribe() throws IOException {
        if (this._id == null) {
            throw new IOException("sub requires id");
        }
        if (this._destinationName == null) {
            throw new IOException("sub requires destination");
        }
        BrokerReceiver sub = this._subscriptionMap.get(this._id);
        if (sub != null) {
            throw new IOException("sub exists");
        }
        MessageBroker broker = this._stomp.getBroker();
        MessageListener listener = new MessageListener(this, this._id, this._destinationName);
        DistributionMode distMode = null;
        Map<String, Object> nodeProperties = null;
        sub = broker.createReceiver(this._destinationName, distMode, nodeProperties, listener);
        this._subscriptionMap.put(this._id, sub);
        return true;
    }

    public boolean unsubscribe(String id) {
        BrokerReceiver sub = this._subscriptionMap.remove(id);
        if (sub != null) {
            sub.close();
            return true;
        }
        return false;
    }

    public boolean ack(String sid, long mid) {
        BrokerReceiver sub = this._subscriptionMap.get(sid);
        if (sub != null) {
            sub.accepted(this._xid, mid);
            return true;
        }
        return false;
    }

    public boolean nack(String sid, long mid) {
        BrokerReceiver sub = this._subscriptionMap.get(sid);
        if (sub != null) {
            sub.rejected(this._xid, mid, null);
            return true;
        }
        return false;
    }

    public boolean begin(String tid) {
        this._xaList = new ArrayList();
        return true;
    }

    public boolean commit(String tid) {
        ArrayList<StompXaItem> xaList = this._xaList;
        this._xaList = null;
        for (StompXaItem xaItem : xaList) {
            xaItem.doCommand(this);
        }
        return true;
    }

    public boolean abort(String tid) {
        this._xaList = null;
        return true;
    }

    void addXaItem(StompXaItem xaItem) {
        this._xaList.add(xaItem);
    }

    @Override
    public boolean handleRequest() throws IOException {
        ReadStream is = this._link.getReadStream();
        if (!this.readMethod(is)) {
            return false;
        }
        StompCommand cmd = _commandMap.get(this._method);
        if (cmd == null) {
            throw new IOException("unknown command: " + this._method);
        }
        this.clearHeaders();
        while (this.readHeader(is)) {
        }
        WriteStream os = this._link.getWriteStream();
        System.out.println("CMD: " + cmd + " " + os);
        return cmd.doCommand(this, is, os);
    }

    private void clearHeaders() {
        this._contentLength = -1L;
        this._contentType = null;
        this._destinationName = null;
        this._id = null;
        this._receipt = null;
        this._messageId = -1L;
        this._transaction = null;
    }

    private boolean readMethod(ReadStream is) throws IOException {
        CharBuffer method = this._method;
        method.clear();
        int ch = is.read();
        while (65 <= ch && ch <= 90) {
            method.append((char)ch);
            ch = is.read();
        }
        return ch == 10;
    }

    private boolean readHeader(ReadStream is) throws IOException {
        char[] buffer = this._headerBuffer;
        int keyHead = 0;
        int keyTail = 0;
        int valueHead = 0;
        int valueTail = 0;
        int ch = is.read();
        while (ch > 0 && ch != 58 && ch != 10) {
            buffer[keyTail++] = (char)ch;
            ch = is.read();
        }
        if (ch == 10) {
            return false;
        }
        if (ch != 58) {
            throw new IOException("bad protocol");
        }
        buffer[keyTail] = 58;
        valueTail = valueHead = keyTail + 1;
        ch = is.read();
        while (ch > 0 && ch != 10) {
            buffer[valueTail++] = (char)ch;
            ch = is.read();
        }
        if (ch != 10) {
            throw new IOException("bad protocol2");
        }
        buffer[valueTail] = 10;
        this.handleHeader(buffer, keyHead, keyTail - keyHead, buffer, valueHead, valueTail - valueHead);
        return true;
    }

    private void handleHeader(char[] keyBuffer, int keyOffset, int keyLength, char[] valueBuffer, int valueOffset, int valueLength) throws IOException {
        int code = (keyLength << 16) + keyBuffer[keyOffset];
        System.out.println("CODE: " + Integer.toHexString(code));
        switch (code) {
            case 917603: {
                if (!CONTENT_LENGTH.equals(keyBuffer, keyOffset, keyLength)) break;
                this._contentLength = this.parseLong(valueBuffer, valueOffset, valueLength);
                break;
            }
            case 786531: {
                if (!CONTENT_TYPE.equals(keyBuffer, keyOffset, keyLength)) break;
                this._contentType = new String(valueBuffer, valueOffset, valueLength);
                break;
            }
            case 720996: {
                if (!DESTINATION.equals(keyBuffer, keyOffset, keyLength)) break;
                this._destinationName = new String(valueBuffer, valueOffset, valueLength);
                break;
            }
            case 131177: {
                if (!ID.equals(keyBuffer, keyOffset, keyLength)) break;
                this._id = new String(valueBuffer, valueOffset, valueLength);
                break;
            }
            case 655469: {
                if (!MESSAGE_ID.equals(keyBuffer, keyOffset, keyLength)) break;
                this._messageId = this.parseLong(valueBuffer, valueOffset, valueLength);
                break;
            }
            case 458866: {
                if (!RECEIPT.equals(keyBuffer, keyOffset, keyLength)) break;
                this._receipt = new String(valueBuffer, valueOffset, valueLength);
                break;
            }
            case 786547: {
                if (!SUBSCRIPTION.equals(keyBuffer, keyOffset, keyLength)) break;
                this._subscription = new String(valueBuffer, valueOffset, valueLength);
                break;
            }
            case 721012: {
                if (!TRANSACTION.equals(keyBuffer, keyOffset, keyLength)) break;
                this._transaction = new String(valueBuffer, valueOffset, valueLength);
                break;
            }
            default: {
                System.out.println("HH: " + new String(keyBuffer, keyOffset, keyLength) + " " + new String(valueBuffer, valueOffset, valueLength));
            }
        }
    }

    private long parseLong(char[] buffer, int offset, int length) {
        long value = 0L;
        for (int i = 0; i < length; ++i) {
            value = 10L * value + (long)buffer[offset + i] - 48L;
        }
        return value;
    }

    void receipt(String receipt) {
        try {
            WriteStream out = this._link.getWriteStream();
            out.print("RECEIPT\nreceipt-id:");
            out.print(receipt);
            out.print("\n\n\u0000");
            out.flush();
        }
        catch (IOException e) {
            log.log(Level.FINER, e.toString(), e);
        }
    }

    void message(String subscription, String destination, long messageId, InputStream bodyIs, long contentLength) throws IOException {
        WriteStream out = this._link.getWriteStream();
        out.print("MESSAGE");
        out.print("\nsubscription:");
        out.print(subscription);
        out.print("\ndestination:");
        out.print(destination);
        out.print("\nmessage-id:");
        out.print(messageId);
        if (contentLength >= 0L) {
            out.print("\ncontent-length:");
            out.print(contentLength);
            out.print("\n\n");
            out.writeStream(bodyIs, (int)contentLength);
        } else {
            out.print("\n\n");
            out.writeStream(bodyIs);
        }
        out.print("\u0000");
        out.flush();
    }

    @Override
    public boolean handleResume() throws IOException {
        return false;
    }

    @Override
    public boolean isWaitForRead() {
        return false;
    }

    @Override
    public void onCloseConnection() {
        ArrayList<BrokerSender> destList = new ArrayList<BrokerSender>(this._destinationMap.values());
        this._destinationMap.clear();
        ArrayList<BrokerReceiver> subList = new ArrayList<BrokerReceiver>(this._subscriptionMap.values());
        this._destinationMap.clear();
        this._subscriptionMap.clear();
        for (BrokerSender dest : destList) {
            dest.close();
        }
        for (BrokerReceiver sub : subList) {
            sub.close();
        }
        this._xaList = null;
    }

    @Override
    public void onStartConnection() {
    }

    static {
        _commandMap.put(new CharBuffer("ABORT"), new StompAbortCommand());
        _commandMap.put(new CharBuffer("ACK"), new StompAckCommand());
        _commandMap.put(new CharBuffer("BEGIN"), new StompBeginCommand());
        _commandMap.put(new CharBuffer("COMMIT"), new StompCommitCommand());
        _commandMap.put(new CharBuffer("CONNECT"), new StompConnectCommand());
        _commandMap.put(new CharBuffer("DISCONNECT"), new StompDisconnectCommand());
        _commandMap.put(new CharBuffer("NACK"), new StompNackCommand());
        _commandMap.put(new CharBuffer("SEND"), new StompSendCommand());
        _commandMap.put(new CharBuffer("SUBSCRIBE"), new StompSubscribeCommand());
        _commandMap.put(new CharBuffer("UNSUBSCRIBE"), new StompUnsubscribeCommand());
    }

    static class MessageListener
    implements ReceiverMessageHandler {
        private StompConnection _conn;
        private String _subscription;
        private String _destination;

        MessageListener(StompConnection conn, String subscription, String destination) {
            this._conn = conn;
            this._subscription = subscription;
            this._destination = destination;
        }

        @Override
        public void onMessage(long messageId, InputStream bodyIs, long contentLength) throws IOException {
            this._conn.message(this._subscription, this._destination, messageId, bodyIs, contentLength);
        }
    }

    static class ReceiptListener
    implements SenderSettleHandler {
        private StompConnection _conn;
        private String _receipt;

        ReceiptListener(StompConnection conn, String receipt) {
            this._conn = conn;
            this._receipt = receipt;
        }

        @Override
        public boolean isSettled() {
            return true;
        }

        @Override
        public void onAccepted(long mid) {
            this._conn.receipt(this._receipt);
        }

        @Override
        public void onRejected(long mid, String msg) {
        }
    }
}

