/*
 * Decompiled with CFR 0.152.
 */
package org.kaazing.net.sse.impl;

import java.io.IOException;
import java.net.URI;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.kaazing.net.http.HttpRedirectPolicy;
import org.kaazing.net.impl.util.BlockingQueueImpl;
import org.kaazing.net.sse.SseEventReader;
import org.kaazing.net.sse.SseEventSource;
import org.kaazing.net.sse.SseException;
import org.kaazing.net.sse.impl.SseEventReaderImpl;
import org.kaazing.net.sse.impl.SseEventStream;
import org.kaazing.net.sse.impl.SseEventStreamListener;
import org.kaazing.net.sse.impl.SsePayload;

public class SseEventSourceImpl
extends SseEventSource {
    private static final String _CLASS_NAME = SseEventSourceImpl.class.getName();
    private static final Logger _LOG = Logger.getLogger(_CLASS_NAME);
    private SseEventStream _eventStream;
    private ReadyState _readyState;
    private SseException _exception;
    private SseEventReaderImpl _eventReader;
    private BlockingQueueImpl<Object> _sharedQueue;
    private URI _location;
    private HttpRedirectPolicy _redirectOption;
    private long _retryTimeout = 3000L;
    private SseEventStreamListener _eventStreamListener = new SseEventStreamListener(){

        @Override
        public void streamOpened() {
            _LOG.entering(_CLASS_NAME, "streamOpened");
            SseEventSourceImpl.this.connectionOpened();
        }

        @Override
        public void messageReceived(String eventName, String message) {
            _LOG.entering(_CLASS_NAME, "messageReceived", message);
            SseEventSourceImpl.this.messageArrived(eventName, message);
        }

        @Override
        public void streamErrored(Exception exception) {
            _LOG.entering(_CLASS_NAME, "streamErrored");
            SseEventSourceImpl.this.connectionFailed(exception);
        }
    };

    public SseEventSourceImpl(URI location) {
        URI loc = location;
        if (location.getScheme().equalsIgnoreCase("sse")) {
            String fragment = location.getFragment();
            String schemeSpecificPart = location.getSchemeSpecificPart();
            if (fragment == null) {
                fragment = "";
            }
            loc = URI.create("http:" + schemeSpecificPart + fragment);
        }
        this._location = loc;
        this._readyState = ReadyState.CLOSED;
        this._sharedQueue = new BlockingQueueImpl();
    }

    @Override
    public synchronized void close() throws IOException {
        _LOG.entering(_CLASS_NAME, "close");
        if (this._readyState == ReadyState.CLOSED || this._readyState == ReadyState.CLOSING) {
            _LOG.log(Level.FINE, "Event source is not connected");
            return;
        }
        this.setException(null);
        this._readyState = ReadyState.CLOSING;
        this._eventStream.stop();
        this._readyState = ReadyState.CLOSED;
        this.cleanupAfterClose();
    }

    @Override
    public synchronized void connect() throws IOException {
        _LOG.entering(_CLASS_NAME, "connect");
        if (this._readyState != ReadyState.CLOSED) {
            String s = "Event source must be closed before connecting";
            throw new SseException(s);
        }
        this._eventStream = new SseEventStream(this._location.toString());
        this._eventStream.setListener(this._eventStreamListener);
        this._eventStream.setRetryTimeout(this._retryTimeout);
        if (this._eventReader != null) {
            this._eventReader.reset();
        }
        this._readyState = ReadyState.CONNECTING;
        this.setException(null);
        this._eventStream.connect();
        SseException exception = this.getException();
        if (exception != null) {
            throw exception;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SseEventReader getEventReader() throws IOException {
        if (this._readyState != ReadyState.OPEN) {
            String s = "Cannot get the SseEventReader as the event source is not yet connected";
            throw new IOException(s);
        }
        SseEventSourceImpl sseEventSourceImpl = this;
        synchronized (sseEventSourceImpl) {
            if (this._eventReader != null) {
                return this._eventReader;
            }
            this._eventReader = new SseEventReaderImpl(this, this._sharedQueue);
        }
        return this._eventReader;
    }

    @Override
    public HttpRedirectPolicy getFollowRedirect() {
        return this._redirectOption;
    }

    @Override
    public long getRetryTimeout() {
        return this._retryTimeout;
    }

    @Override
    public void setFollowRedirect(HttpRedirectPolicy redirectOption) {
        this._redirectOption = redirectOption;
    }

    @Override
    public void setRetryTimeout(long millis) {
        this._retryTimeout = millis;
        if (this._eventStream != null) {
            this._eventStream.setRetryTimeout(millis);
        }
    }

    public SseException getException() {
        return this._exception;
    }

    public void setException(SseException exception) {
        this._exception = exception;
    }

    public boolean isConnected() {
        return this._readyState == ReadyState.OPEN;
    }

    public boolean isDisconnected() {
        return this._readyState == ReadyState.CLOSED;
    }

    public BlockingQueueImpl<Object> getSharedQueue() {
        return this._sharedQueue;
    }

    private synchronized void connectionOpened() {
        this._readyState = ReadyState.OPEN;
        this.notifyAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void messageArrived(String eventName, String data) {
        if (this._readyState != ReadyState.OPEN) {
            return;
        }
        BlockingQueueImpl<Object> blockingQueueImpl = this._sharedQueue;
        synchronized (blockingQueueImpl) {
            try {
                this._sharedQueue.put(new SsePayload(eventName, data));
            }
            catch (InterruptedException ex) {
                _LOG.log(Level.INFO, ex.getMessage(), ex);
            }
        }
    }

    private synchronized void connectionClosed(String reason) {
        this._readyState = ReadyState.CLOSED;
        if (reason != null) {
            this.setException(new SseException(reason));
        }
        this.cleanupAfterClose();
        this.notifyAll();
    }

    private synchronized void connectionFailed(Exception exception) {
        SseException ex = null;
        ex = exception == null ? new SseException("Connection Failed") : new SseException(exception);
        this.setException(ex);
        this._readyState = ReadyState.CLOSED;
        this.cleanupAfterClose();
        this.notifyAll();
    }

    private synchronized void cleanupAfterClose() {
        if (this._eventReader != null) {
            try {
                this._eventReader.close();
            }
            catch (IOException ex) {
                _LOG.log(Level.FINE, ex.getMessage(), ex);
            }
        } else {
            this.getSharedQueue().done();
        }
    }

    static enum ReadyState {
        CONNECTING,
        OPEN,
        CLOSING,
        CLOSED;

    }
}

