/*
 * Decompiled with CFR 0.152.
 */
package org.noear.solon.net.stomp.broker;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import org.noear.solon.Utils;
import org.noear.solon.lang.Nullable;
import org.noear.solon.net.stomp.Frame;
import org.noear.solon.net.stomp.broker.impl.StompBrokerMedia;
import org.noear.solon.net.stomp.listener.StompListener;
import org.noear.solon.net.websocket.SubProtocolCapable;
import org.noear.solon.net.websocket.WebSocket;
import org.noear.solon.net.websocket.WebSocketListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ToStompWebSocketListener
implements WebSocketListener,
SubProtocolCapable {
    static Logger log = LoggerFactory.getLogger(ToStompWebSocketListener.class);
    private final StompBrokerMedia brokerMedia;

    protected ToStompWebSocketListener(String endpoint, StompBrokerMedia brokerMedia) {
        if (endpoint == null) {
            throw new IllegalArgumentException("Endpoint is not empty");
        }
        this.brokerMedia = brokerMedia;
    }

    public String getSubProtocols(@Nullable Collection<String> requestProtocols) {
        if (Utils.isEmpty(requestProtocols)) {
            return null;
        }
        return "stomp";
    }

    public void onOpen(WebSocket socket) {
        for (StompListener listener : this.brokerMedia.listeners) {
            listener.onOpen(socket);
        }
    }

    public void onMessage(WebSocket socket, ByteBuffer binary) throws IOException {
        String txt = Charset.forName("UTF-8").decode(binary).toString();
        this.onMessage(socket, txt);
    }

    public void onMessage(WebSocket socket, String text) throws IOException {
        AtomicBoolean decodeOk = new AtomicBoolean(Boolean.FALSE);
        this.brokerMedia.operations.getCodec().decode(text, frame -> {
            decodeOk.set(Boolean.TRUE);
            this.onStomp(socket, (Frame)frame, text);
        });
        if (!decodeOk.get()) {
            if (log.isDebugEnabled()) {
                log.debug("session ping, {}", (Object)socket.id());
            }
            this.brokerMedia.emitter.sendTo(socket, Frame.newBuilder().command("MESSAGE").payload(text).build());
        }
    }

    public void onClose(WebSocket socket) {
        for (StompListener listener : this.brokerMedia.listeners) {
            listener.onClose(socket);
        }
    }

    public void onError(WebSocket socket, Throwable error) {
        log.error("", error);
    }

    protected void onStomp(WebSocket socket, Frame frame, String text) {
        String command;
        switch (command = frame.getCommand() == null ? "" : frame.getCommand()) {
            case "STOMP": 
            case "CONNECT": {
                for (StompListener listener : this.brokerMedia.listeners) {
                    listener.onConnect(socket, frame);
                }
                break;
            }
            case "DISCONNECT": {
                for (StompListener listener : this.brokerMedia.listeners) {
                    listener.onDisconnect(socket, frame);
                }
                break;
            }
            case "SUBSCRIBE": {
                for (StompListener listener : this.brokerMedia.listeners) {
                    listener.onSubscribe(socket, frame);
                }
                break;
            }
            case "UNSUBSCRIBE": {
                for (StompListener listener : this.brokerMedia.listeners) {
                    listener.onUnsubscribe(socket, frame);
                }
                break;
            }
            case "SEND": {
                for (StompListener listener : this.brokerMedia.listeners) {
                    listener.onSend(socket, frame);
                }
                break;
            }
            case "ACK": 
            case "NACK": {
                for (StompListener listener : this.brokerMedia.listeners) {
                    listener.onAck(socket, frame);
                }
                break;
            }
            default: {
                log.warn("session unknown, {}\r\n{}", (Object)socket.id(), (Object)text);
                this.brokerMedia.emitter.sendTo(socket, Frame.newBuilder().command("UNKNOWN").payload(text).build());
            }
        }
    }
}

