/*
 * Decompiled with CFR 0.152.
 */
package com.signalfx.signalflow;

import com.signalfx.connection.AbstractHttpReceiverConnection;
import com.signalfx.endpoint.SignalFxEndpoint;
import com.signalfx.shaded.apache.http.HttpEntity;
import com.signalfx.shaded.apache.http.NameValuePair;
import com.signalfx.shaded.apache.http.StatusLine;
import com.signalfx.shaded.apache.http.client.config.RequestConfig;
import com.signalfx.shaded.apache.http.client.methods.CloseableHttpResponse;
import com.signalfx.shaded.apache.http.client.methods.HttpPost;
import com.signalfx.shaded.apache.http.client.utils.URIBuilder;
import com.signalfx.shaded.apache.http.entity.StringEntity;
import com.signalfx.shaded.apache.http.impl.conn.BasicHttpClientConnectionManager;
import com.signalfx.shaded.apache.http.message.BasicNameValuePair;
import com.signalfx.signalflow.Channel;
import com.signalfx.signalflow.SignalFlowException;
import com.signalfx.signalflow.SignalFlowTransport;
import com.signalfx.signalflow.StreamMessage;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerSentEventsTransport
implements SignalFlowTransport {
    protected static final Logger log = LoggerFactory.getLogger(ServerSentEventsTransport.class);
    public static final Integer DEFAULT_TIMEOUT = 1000;
    protected final String token;
    protected final SignalFxEndpoint endpoint;
    protected final String path;
    protected Integer timeout = DEFAULT_TIMEOUT;

    protected ServerSentEventsTransport(String token, SignalFxEndpoint endpoint, int apiVersion, Integer timeout) {
        this.token = token;
        this.endpoint = endpoint;
        this.path = "/v" + apiVersion + "/signalflow";
        this.timeout = timeout;
    }

    @Override
    public Channel attach(String handle, Map<String, String> parameters) {
        if (log.isDebugEnabled()) {
            log.debug("attach: [ {} ] with parameters: {}", (Object)handle, parameters);
        }
        TransportConnection connection = null;
        CloseableHttpResponse response = null;
        try {
            connection = new TransportConnection(this.endpoint, this.timeout);
            response = connection.post(this.token, this.path + "/" + handle + "/attach", parameters, null);
            return new TransportChannel(connection, response);
        }
        catch (Exception ex) {
            this.close(response);
            this.close(connection);
            throw new SignalFlowException("failed to create transport channel for attach", ex);
        }
    }

    @Override
    public Channel execute(String program, Map<String, String> parameters) throws SignalFlowException {
        if (log.isDebugEnabled()) {
            log.debug("execute: [ {} ] with parameters: {}", (Object)program, parameters);
        }
        TransportConnection connection = null;
        CloseableHttpResponse response = null;
        try {
            connection = new TransportConnection(this.endpoint, this.timeout);
            response = connection.post(this.token, this.path + "/execute", parameters, program);
            return new TransportChannel(connection, response);
        }
        catch (IOException ioex) {
            this.close(response);
            this.close(connection);
            throw new SignalFlowException("failed to create transport channel for execute", ioex);
        }
    }

    @Override
    public Channel preflight(String program, Map<String, String> parameters) throws SignalFlowException {
        if (log.isDebugEnabled()) {
            log.debug("preflight: [ {} ] with parameters: {}", (Object)program, parameters);
        }
        TransportConnection connection = null;
        CloseableHttpResponse response = null;
        try {
            connection = new TransportConnection(this.endpoint, this.timeout);
            response = connection.post(this.token, this.path + "/preflight", parameters, program);
            return new TransportChannel(connection, response);
        }
        catch (IOException ioex) {
            this.close(response);
            this.close(connection);
            throw new SignalFlowException("failed to create transport channel for execute", ioex);
        }
    }

    @Override
    public void start(String program, Map<String, String> parameters) {
        if (log.isDebugEnabled()) {
            log.debug("start: [ {} ] with parameters: {}", (Object)program, parameters);
        }
        TransportConnection connection = null;
        CloseableHttpResponse response = null;
        try {
            connection = new TransportConnection(this.endpoint, this.timeout);
            response = connection.post(this.token, this.path + "/start", parameters, program);
            this.close(response);
            this.close(connection);
        }
        catch (Exception ex) {
            try {
                throw new SignalFlowException("failed to start program - " + program, ex);
            }
            catch (Throwable throwable) {
                this.close(response);
                this.close(connection);
                throw throwable;
            }
        }
    }

    @Override
    public void stop(String handle, Map<String, String> parameters) {
        if (log.isDebugEnabled()) {
            log.debug("stop: [ {} ] with parameters: {}", (Object)handle, parameters);
        }
        TransportConnection connection = null;
        CloseableHttpResponse response = null;
        try {
            connection = new TransportConnection(this.endpoint, this.timeout);
            response = connection.post(this.token, this.path + "/" + handle + "/stop", parameters, null);
            this.close(response);
            this.close(connection);
        }
        catch (Exception ex) {
            try {
                throw new SignalFlowException("failed to stop program - " + handle, ex);
            }
            catch (Throwable throwable) {
                this.close(response);
                this.close(connection);
                throw throwable;
            }
        }
    }

    @Override
    public void keepalive(String handle) {
        if (log.isDebugEnabled()) {
            log.debug("keepalive: [ {} ]", (Object)handle);
        }
        TransportConnection connection = null;
        CloseableHttpResponse response = null;
        try {
            connection = new TransportConnection(this.endpoint, this.timeout);
            response = connection.post(this.token, this.path + "/" + handle + "/keepalive", null, null);
            this.close(response);
            this.close(connection);
        }
        catch (Exception ex) {
            try {
                throw new SignalFlowException("failed to set keepalive for program - " + handle, ex);
            }
            catch (Throwable throwable) {
                this.close(response);
                this.close(connection);
                throw throwable;
            }
        }
    }

    @Override
    public void close(int code, String reason) {
    }

    private void close(CloseableHttpResponse response) {
        try {
            if (response != null) {
                response.close();
            }
        }
        catch (IOException ioex) {
            log.error("error closing response", (Throwable)ioex);
        }
    }

    private void close(TransportConnection connection) {
        try {
            if (connection != null) {
                connection.close();
            }
        }
        catch (IOException ioex) {
            log.error("error closing transport connection", (Throwable)ioex);
        }
    }

    public static class TransportEventStreamParser
    implements Iterator<StreamMessage>,
    Closeable {
        protected static final Logger log = LoggerFactory.getLogger(TransportEventStreamParser.class);
        private static final String EVENT = "event";
        private static final String ID = "id";
        private static final String DATA = "data";
        private static final String RETRY = "retry";
        private static final String DEFAULT_EVENT = "message";
        private static final String EMPTY_STRING = "";
        private static final Pattern DIGITS_ONLY = Pattern.compile("^[\\d]+$");
        private BufferedReader eventStreamReader;
        private boolean endOfStreamReached = false;
        private int reconnectionTimeoutMs = 1000;
        private StreamMessage nextMessage;
        private String lastEventId;
        private String eventNameBuffer = "message";
        private StringBuilder dataBuffer = new StringBuilder();

        public TransportEventStreamParser(InputStream eventStream) throws UnsupportedEncodingException {
            this.eventStreamReader = new BufferedReader(new InputStreamReader(eventStream, "UTF-8"));
        }

        public String getLastEventId() {
            return this.lastEventId;
        }

        public int getReconnectionTimeoutMs() {
            return this.reconnectionTimeoutMs;
        }

        @Override
        public boolean hasNext() {
            while (!this.endOfStreamReached && this.eventStreamReader != null && this.nextMessage == null) {
                this.parseNext();
            }
            return this.nextMessage != null;
        }

        @Override
        public StreamMessage next() {
            while (!this.endOfStreamReached && this.eventStreamReader != null && this.nextMessage == null) {
                this.parseNext();
            }
            if (this.nextMessage != null) {
                StreamMessage message = this.nextMessage;
                this.nextMessage = null;
                return message;
            }
            throw new NoSuchElementException("no more stream messages");
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("remove from stream not supported");
        }

        @Override
        public void close() {
            if (this.eventStreamReader != null) {
                try {
                    this.eventStreamReader.close();
                    this.eventStreamReader = null;
                }
                catch (IOException ex) {
                    log.error("failed to close event stream", (Throwable)ex);
                }
            }
        }

        private void parseNext() {
            if (this.eventStreamReader != null) {
                try {
                    String line;
                    long startTime = System.currentTimeMillis();
                    this.dataBuffer.setLength(0);
                    while ((line = this.eventStreamReader.readLine()) != null && !line.trim().isEmpty()) {
                        if (line.startsWith(":")) continue;
                        int colonIndex = line.indexOf(":");
                        if (colonIndex != -1) {
                            String field = line.substring(0, colonIndex);
                            String value = line.substring(colonIndex + 1).replaceFirst(" ", EMPTY_STRING);
                            this.processField(field, value);
                            continue;
                        }
                        this.processField(line.trim(), EMPTY_STRING);
                    }
                    if (line == null) {
                        this.endOfStreamReached = true;
                        this.close();
                    }
                    if (this.dataBuffer.length() > 0) {
                        String data = this.dataBuffer.toString();
                        if (data.endsWith("\n")) {
                            data = data.substring(0, data.length() - 1);
                        }
                        this.nextMessage = new StreamMessage(this.eventNameBuffer, this.lastEventId, data);
                    } else {
                        log.debug(this.eventNameBuffer.toString());
                        this.eventNameBuffer = EMPTY_STRING;
                        this.nextMessage = null;
                    }
                    log.debug("total stream message read/parse time (ms): {}", (Object)(System.currentTimeMillis() - startTime));
                }
                catch (IOException ex) {
                    log.error("failed to parse next stream event", (Throwable)ex);
                    throw new SignalFlowException("failed to parse next stream event", ex);
                }
            } else {
                this.nextMessage = null;
            }
        }

        private void processField(String field, String value) {
            if (DATA.equals(field)) {
                this.dataBuffer.append(value).append("\n");
            } else if (ID.equals(field)) {
                this.lastEventId = value;
            } else if (EVENT.equals(field)) {
                this.eventNameBuffer = value;
            } else if (RETRY.equals(field) && DIGITS_ONLY.matcher(value).matches()) {
                this.reconnectionTimeoutMs = Integer.parseInt(value);
            }
        }
    }

    public static class TransportChannel
    extends Channel {
        protected static final Logger log = LoggerFactory.getLogger(TransportChannel.class);
        private TransportConnection connection;
        private CloseableHttpResponse response;
        private HttpEntity responseHttpEntity;
        private TransportEventStreamParser streamParser;

        public TransportChannel(TransportConnection connection, CloseableHttpResponse response) throws IOException {
            this.connection = connection;
            this.response = response;
            this.responseHttpEntity = response.getEntity();
            this.streamParser = new TransportEventStreamParser(this.responseHttpEntity.getContent());
            this.iterator = this.streamParser;
            log.debug("constructed {} of type {}", (Object)this, (Object)this.getClass().getName());
        }

        @Override
        public void close() {
            super.close();
            try {
                this.response.close();
            }
            catch (IOException ex) {
                log.error("failed to close response", (Throwable)ex);
            }
            try {
                this.connection.close();
            }
            catch (IOException ex) {
                log.error("failed to close connection", (Throwable)ex);
            }
            this.streamParser.close();
        }
    }

    public static class TransportConnection
    extends AbstractHttpReceiverConnection {
        protected static final Logger log = LoggerFactory.getLogger(TransportConnection.class);
        public static final int DEFAULT_TIMEOUT_MS = 1000;
        protected final RequestConfig transportRequestConfig;

        public TransportConnection(SignalFxEndpoint endpoint) {
            this(endpoint, 1000);
        }

        public TransportConnection(SignalFxEndpoint endpoint, int timeoutMs) {
            super(endpoint, timeoutMs, new BasicHttpClientConnectionManager());
            this.transportRequestConfig = RequestConfig.custom().setSocketTimeout(0).setConnectionRequestTimeout(this.requestConfig.getConnectionRequestTimeout()).setConnectTimeout(this.requestConfig.getConnectTimeout()).setProxy(this.requestConfig.getProxy()).build();
            log.debug("constructed request config: {}", (Object)this.transportRequestConfig.toString());
        }

        public CloseableHttpResponse post(String token, String path, Map<String, String> parameters, String body) throws SignalFlowException {
            HttpPost httpPost = null;
            try {
                CloseableHttpResponse response;
                StatusLine statusLine;
                int statuscode;
                ArrayList<NameValuePair> params = new ArrayList<NameValuePair>();
                if (parameters != null) {
                    for (Map.Entry<String, String> entry : parameters.entrySet()) {
                        params.add(new BasicNameValuePair(entry.getKey(), entry.getValue()));
                    }
                }
                URIBuilder uriBuilder = new URIBuilder(String.format("%s%s", this.host.toURI(), path));
                uriBuilder.addParameters(params);
                httpPost = new HttpPost(uriBuilder.build());
                httpPost.setConfig(this.transportRequestConfig);
                httpPost.setHeader("X-SF-TOKEN", token);
                httpPost.setHeader("User-Agent", "SignalFx-java-client/1.0.0");
                httpPost.setHeader("Content-Type", "text/plain");
                if (body != null) {
                    StringEntity httpEntity = new StringEntity(body);
                    httpPost.setEntity(httpEntity);
                }
                if (log.isDebugEnabled()) {
                    log.debug(httpPost.toString());
                }
                if ((statuscode = (statusLine = (response = this.client.execute(httpPost)).getStatusLine()).getStatusCode()) < 200 || statuscode >= 300) {
                    try {
                        response.close();
                    }
                    catch (IOException ex) {
                        log.error("failed to close response", (Throwable)ex);
                    }
                    String errorMessage = statusLine.getStatusCode() + ": failed post [ " + httpPost + " ] reason: " + statusLine.getReasonPhrase();
                    throw new SignalFlowException(statusLine.getStatusCode(), errorMessage);
                }
                return response;
            }
            catch (IOException ex) {
                throw new SignalFlowException("failed communication. " + ex.getMessage(), ex);
            }
            catch (URISyntaxException ex) {
                throw new SignalFlowException("invalid uri. " + ex.getMessage(), ex);
            }
        }

        public void close() throws IOException {
            this.client.close();
        }
    }

    public static class TransportBuilder {
        private String token;
        private String protocol = "https";
        private String host = "stream.signalfx.com";
        private int port = 443;
        private int timeout = 1;
        private int version = 2;

        public TransportBuilder(String token) {
            this.token = token;
        }

        public TransportBuilder setProtocol(String protocol) {
            this.protocol = protocol;
            return this;
        }

        public TransportBuilder setHost(String host) {
            this.host = host;
            return this;
        }

        public TransportBuilder setPort(int port) {
            this.port = port;
            return this;
        }

        public TransportBuilder setTimeout(int timeout) {
            this.timeout = timeout;
            return this;
        }

        public TransportBuilder setAPIVersion(int version) {
            this.version = version;
            return this;
        }

        public ServerSentEventsTransport build() {
            SignalFxEndpoint endpoint = new SignalFxEndpoint(this.protocol, this.host, this.port);
            ServerSentEventsTransport transport = new ServerSentEventsTransport(this.token, endpoint, this.version, this.timeout * 1000);
            return transport;
        }
    }
}

