/*
 * Decompiled with CFR 0.152.
 */
package com.signalfx.signalflow;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.signalfx.endpoint.SignalFxEndpoint;
import com.signalfx.signalflow.Channel;
import com.signalfx.signalflow.ChannelMessage;
import com.signalfx.signalflow.SignalFlowException;
import com.signalfx.signalflow.SignalFlowTransport;
import com.signalfx.signalflow.StreamMessage;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketClient;
import org.eclipse.jetty.websocket.WebSocketClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketTransport
implements SignalFlowTransport {
    protected static final Logger log = LoggerFactory.getLogger(WebSocketTransport.class);
    public static final Integer DEFAULT_TIMEOUT = 1;
    protected final String token;
    protected final SignalFxEndpoint endpoint;
    protected final String path;
    protected Integer timeout = DEFAULT_TIMEOUT;
    protected WebSocketClient webSocketClient;
    protected TransportConnection transportConnection;

    protected WebSocketTransport(String token, SignalFxEndpoint endpoint, int apiVersion, Integer timeout) {
        this.token = token;
        this.endpoint = endpoint;
        this.path = "/v" + apiVersion + "/signalflow/connect";
        this.timeout = timeout;
        try {
            WebSocketClientFactory factory = new WebSocketClientFactory();
            factory.start();
            this.webSocketClient = factory.newWebSocketClient();
            URIBuilder uriBuilder = new URIBuilder(String.format("%s://%s:%s%s", endpoint.getScheme(), endpoint.getHostname(), endpoint.getPort(), this.path));
            this.transportConnection = new TransportConnection(token);
            this.webSocketClient.open(uriBuilder.build(), (WebSocket)this.transportConnection, (long)timeout.intValue(), TimeUnit.SECONDS);
        }
        catch (Exception ex) {
            throw new SignalFlowException("failed to construct websocket transport", ex);
        }
    }

    @Override
    public Channel attach(String handle, Map<String, String> parameters) {
        log.debug("attach: [ {} ] with parameters: {}", (Object)handle, parameters);
        TransportChannel channel = new TransportChannel(this.transportConnection);
        HashMap<String, String> request = new HashMap<String, String>(parameters);
        request.put("type", "attach");
        request.put("handle", handle);
        this.transportConnection.sendMessage(channel, request);
        return channel;
    }

    @Override
    public Channel execute(String program, Map<String, String> parameters) {
        log.debug("execute: [ {} ] with parameters: {}", (Object)program, parameters);
        TransportChannel channel = new TransportChannel(this.transportConnection);
        HashMap<String, String> request = new HashMap<String, String>(parameters);
        request.put("type", "execute");
        request.put("program", program);
        this.transportConnection.sendMessage(channel, request);
        return channel;
    }

    @Override
    public void start(String program, Map<String, String> parameters) {
        log.debug("start: [ {} ] with parameters: {}", (Object)program, parameters);
        HashMap<String, String> request = new HashMap<String, String>(parameters);
        request.put("type", "start");
        request.put("program", program);
        this.transportConnection.sendMessage(request);
    }

    @Override
    public void stop(String handle, Map<String, String> parameters) {
        log.debug("stop: [ {} ] with parameters: {}", (Object)handle, parameters);
        HashMap<String, String> request = new HashMap<String, String>(parameters);
        request.put("type", "stop");
        request.put("handle", handle);
        this.transportConnection.sendMessage(request);
    }

    @Override
    public void close(int code, String reason) {
        if (this.transportConnection.getConnection() != null && this.transportConnection.getConnection().isOpen()) {
            this.transportConnection.close(code, reason);
            try {
                this.webSocketClient.getFactory().stop();
            }
            catch (Exception ex) {
                log.error("error while stopping websocketfactory", (Throwable)ex);
            }
            log.debug("transport closed");
        }
    }

    @Override
    public void keepalive(String handle) {
        log.debug("keepalive: [ {} ]", (Object)handle);
        HashMap<String, String> request = new HashMap<String, String>();
        request.put("type", "keepalive");
        request.put("handle", handle);
        this.transportConnection.sendMessage(request);
    }

    protected static class TransportEventStreamParser
    implements Iterator<StreamMessage> {
        protected Queue<StreamMessage> messageQueue;
        protected boolean isClosed = false;

        public TransportEventStreamParser(Queue<StreamMessage> messageQueue) {
            this.messageQueue = messageQueue;
        }

        @Override
        public boolean hasNext() {
            return !this.isClosed;
        }

        @Override
        public StreamMessage next() {
            StreamMessage streamMessage = null;
            block6: while (!this.isClosed && streamMessage == null) {
                streamMessage = this.messageQueue.poll();
                if (streamMessage != null) {
                    switch (streamMessage.getKind()) {
                        case CONTROL: {
                            ChannelMessage channelMessage = ChannelMessage.decodeStreamMessage(streamMessage);
                            if (channelMessage.getType() != ChannelMessage.Type.END_OF_CHANNEL && channelMessage.getType() != ChannelMessage.Type.CHANNEL_ABORT) continue block6;
                            this.close();
                            continue block6;
                        }
                        case ERROR: {
                            if (!(streamMessage instanceof SignalFlowExceptionStreamMessage)) continue block6;
                            this.close();
                            throw ((SignalFlowExceptionStreamMessage)streamMessage).getException();
                        }
                    }
                    continue;
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException ex) {
                    this.close();
                }
            }
            if (streamMessage != null) {
                return streamMessage;
            }
            throw new NoSuchElementException("no more stream messages");
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("remove from stream not supported");
        }

        public void close() {
            this.isClosed = true;
        }
    }

    protected static class TransportChannel
    extends Channel {
        protected static final Logger log = LoggerFactory.getLogger(TransportChannel.class);
        protected TransportConnection connection;
        protected Queue<StreamMessage> messageQueue = new ConcurrentLinkedQueue<StreamMessage>();
        protected TransportEventStreamParser parser = new TransportEventStreamParser(this.messageQueue);

        public TransportChannel(TransportConnection sharedConnection) {
            this.connection = sharedConnection;
            this.iterator = this.parser;
            this.connection.add(this);
            log.debug("constructed {} of type {}", (Object)this.toString(), (Object)this.getClass().getName());
        }

        public boolean offer(StreamMessage message) {
            return this.messageQueue.offer(message);
        }

        @Override
        public void close() {
            super.close();
            this.connection.remove(this);
        }
    }

    protected static class TransportConnection
    implements WebSocket.OnTextMessage,
    WebSocket.OnBinaryMessage {
        protected static final Logger log = LoggerFactory.getLogger(TransportConnection.class);
        protected String token;
        protected SignalFlowException error;
        protected WebSocket.Connection connection;
        protected Map<String, TransportChannel> channels = Collections.synchronizedMap(new HashMap());
        protected static ObjectMapper objectMapper = new ObjectMapper();

        public TransportConnection(String token) {
            this.token = token;
        }

        public void onClose(int code, String reason) {
            log.debug("websocket connection closed ({} {})", (Object)code, (Object)reason);
            if (code != 1000) {
                this.error = new SignalFlowException(code, reason);
                log.info("Lost WebSocket connection with {} ({}).", (Object)this.connection, (Object)code);
                SignalFlowExceptionStreamMessage errorMessage = new SignalFlowExceptionStreamMessage(this.error);
                for (TransportChannel channel : this.channels.values()) {
                    channel.offer(errorMessage);
                }
            }
            this.channels.clear();
            this.connection = null;
        }

        public void onOpen(WebSocket.Connection connection) {
            log.debug("open connection: {}", (Object)connection);
            this.connection = connection;
            HashMap<String, String> authRequest = new HashMap<String, String>();
            authRequest.put("type", "authenticate");
            authRequest.put("token", this.token);
            this.sendMessage(authRequest);
        }

        public void onMessage(byte[] data, int offset, int length) {
            try {
                byte[] messageBytes = Arrays.copyOfRange(data, offset, offset + length);
                TransportDataMessage dataMessage = new TransportDataMessage(messageBytes);
                if (dataMessage.getKind() == StreamMessage.Kind.DATA) {
                    LinkedHashMap<String, Object> dataMap = new LinkedHashMap<String, Object>();
                    dataMap.put("logicalTimestampMs", dataMessage.getLogicalTimestampMs());
                    dataMap.put("data", dataMessage.getData());
                    TransportChannel channel = this.channels.get(dataMessage.getChannelName());
                    if (channel != null && !channel.isClosed()) {
                        StreamMessage streamMessage = new StreamMessage("data", null, objectMapper.writeValueAsString(dataMap));
                        channel.offer(streamMessage);
                    } else {
                        log.debug("ignoring message. channel not found {}", (Object)dataMessage.getChannelName());
                    }
                }
            }
            catch (JsonProcessingException ex) {
                log.error("failed to process messages", (Throwable)ex);
            }
        }

        public void onMessage(String data) {
            try {
                Map dataMap = (Map)objectMapper.readValue(data, Map.class);
                String event = (String)dataMap.get("event");
                if ("KEEP_ALIVE".equals(event)) {
                    return;
                }
                String type = (String)dataMap.get("type");
                if (type == null) {
                    log.debug("type missing so ignoring message. {}", (Object)dataMap);
                    return;
                }
                if (type.equals("authenticated")) {
                    log.info("WebSocket connection authenticated as {} (in {})", dataMap.get("userId"), dataMap.get("orgId"));
                } else {
                    String channelName = (String)dataMap.get("channel");
                    if (channelName != null) {
                        TransportChannel channel = this.channels.get(channelName);
                        if (channel != null && !channel.isClosed()) {
                            StreamMessage message = new StreamMessage(type, null, data);
                            channel.offer(message);
                        } else {
                            log.debug("ignoring message. channel not found {}", (Object)channelName);
                        }
                    }
                }
            }
            catch (IOException ex) {
                log.error("failed to process messages", (Throwable)ex);
            }
        }

        public void sendMessage(Map<String, String> request) {
            try {
                String message = objectMapper.writeValueAsString(request);
                this.connection.sendMessage(message);
            }
            catch (IOException ex) {
                throw new SignalFlowException("failed to send message", ex);
            }
        }

        public void sendMessage(Channel channel, Map<String, String> request) {
            HashMap<String, String> channelRequest = new HashMap<String, String>(request);
            channelRequest.put("channel", channel.getName());
            try {
                String message = objectMapper.writeValueAsString(channelRequest);
                this.connection.sendMessage(message);
            }
            catch (IOException ex) {
                throw new SignalFlowException("failed to send message for channel " + channel.getName(), ex);
            }
        }

        public void add(TransportChannel channel) {
            this.channels.put(channel.getName(), channel);
        }

        public void remove(TransportChannel channel) {
            this.channels.remove(channel);
        }

        public void close(int code, String reason) {
            for (Channel channel : this.channels.values()) {
                channel.close();
            }
            this.channels.clear();
            this.connection.close(code, reason);
        }

        public WebSocket.Connection getConnection() {
            return this.connection;
        }

        static {
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        }
    }

    protected static class TransportDataMessage {
        protected byte version;
        protected StreamMessage.Kind kind;
        protected String channelName;
        protected long logicalTimestampMs;
        protected List<Map<String, Object>> data = new ArrayList<Map<String, Object>>();

        public TransportDataMessage(byte[] data) {
            try {
                ByteBuffer buffer = ByteBuffer.wrap(data);
                this.version = buffer.get(0);
                this.kind = StreamMessage.Kind.fromBinaryType(buffer.get(1));
                this.channelName = new String(data, 4, 16, "UTF-8");
                if (this.kind == StreamMessage.Kind.DATA) {
                    this.logicalTimestampMs = buffer.getLong(20);
                    byte[] payload = Arrays.copyOfRange(data, 32, data.length);
                    block6: for (int element = 0; element < payload.length / 17; ++element) {
                        int index = element * 17;
                        byte[] tsIdBytes = Arrays.copyOfRange(payload, index + 1, index + 9);
                        String encodedTsId = StringUtils.remove((String)DatatypeConverter.printBase64Binary((byte[])tsIdBytes), (String)"=");
                        HashMap<String, Object> elementMap = new HashMap<String, Object>(4);
                        elementMap.put("tsId", encodedTsId);
                        switch (payload[index]) {
                            case 1: {
                                elementMap.put("value", buffer.getLong(index + 41));
                                break;
                            }
                            case 2: {
                                elementMap.put("value", buffer.getDouble(index + 41));
                                break;
                            }
                            default: {
                                log.warn("ignoring data message with unknown value type {}", (Object)payload[index]);
                                continue block6;
                            }
                        }
                        this.data.add(elementMap);
                    }
                } else {
                    log.warn("Unsupported binary message type {}", (Object)this.kind);
                }
            }
            catch (Exception ex) {
                log.error("failed to construct transport data message", (Throwable)ex);
            }
        }

        public int getVersion() {
            return this.version;
        }

        public StreamMessage.Kind getKind() {
            return this.kind;
        }

        public String getChannelName() {
            return this.channelName;
        }

        public List<Map<String, Object>> getData() {
            return this.data;
        }

        public long getLogicalTimestampMs() {
            return this.logicalTimestampMs;
        }
    }

    protected static class SignalFlowExceptionStreamMessage
    extends StreamMessage {
        protected SignalFlowException exception;

        public SignalFlowExceptionStreamMessage(SignalFlowException exception) {
            super("error", null, exception.getMessage());
            this.exception = exception;
        }

        public SignalFlowException getException() {
            return this.exception;
        }
    }

    public static class TransportBuilder {
        private String token;
        private String protocol = "wss";
        private String host = "stream.signalfx.com";
        private int port = 443;
        private int timeout = 1;
        private int version = 2;

        public TransportBuilder(String token) {
            this.token = token;
        }

        public TransportBuilder setProtocol(String protocol) {
            this.protocol = protocol;
            return this;
        }

        public TransportBuilder setHost(String host) {
            this.host = host;
            return this;
        }

        public TransportBuilder setPort(int port) {
            this.port = port;
            return this;
        }

        public TransportBuilder setTimeout(int timeout) {
            this.timeout = timeout;
            return this;
        }

        public TransportBuilder setAPIVersion(int version) {
            this.version = version;
            return this;
        }

        public WebSocketTransport build() {
            SignalFxEndpoint endpoint = new SignalFxEndpoint(this.protocol, this.host, this.port);
            WebSocketTransport transport = new WebSocketTransport(this.token, endpoint, this.version, this.timeout);
            return transport;
        }
    }
}

