/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.CharBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public class RemoteSocketMessageListener {
    private static final Logger LOGGER = Logger.getLogger(RemoteSocketMessageListener.class.getName());
    private final String host;
    private final int port;
    private final LinkedBlockingQueue<String> outbox;
    private final ExecutorService executorService = Executors.newFixedThreadPool(10);
    private RemoteMessageListenerServer listenerServer;

    public RemoteSocketMessageListener(String host, int port, LinkedBlockingQueue<String> outbox) {
        this.host = host;
        this.port = port;
        this.outbox = outbox;
    }

    public void stop() {
        if (!this.executorService.isShutdown()) {
            this.executorService.shutdownNow();
        }
        this.listenerServer.stop();
    }

    public void start() throws IOException {
        this.listenerServer = new RemoteMessageListenerServer(this.host, this.port, this.outbox);
        this.executorService.execute(this.listenerServer);
    }

    public static interface IMessageAnalyzer {
        public LinkedBlockingQueue<String> getMessageQueue();
    }

    private static class MessageParser
    implements Runnable {
        private Socket client;
        private IMessageAnalyzer messageAnalyzer;
        private static final char EOL = (char)"\n".getBytes()[0];

        public MessageParser(Socket client, IMessageAnalyzer messageAnalyzer) {
            this.client = client;
            this.messageAnalyzer = messageAnalyzer;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            CharBuffer buffer = CharBuffer.allocate(5000);
            try {
                char ch;
                InputStream in = this.client.getInputStream();
                while ((ch = (char)in.read()) != '\uffffffff') {
                    while (ch != EOL) {
                        buffer.put(ch);
                        ch = (char)in.read();
                    }
                    buffer.flip();
                    String s = new String(buffer.array());
                    IMessageAnalyzer iMessageAnalyzer = this.messageAnalyzer;
                    synchronized (iMessageAnalyzer) {
                        this.messageAnalyzer.getMessageQueue().add(s + "\n");
                    }
                    buffer.position(0);
                    buffer.limit(5000);
                }
            }
            catch (IOException ioe) {
                ioe.printStackTrace();
            }
            finally {
                try {
                    this.client.close();
                }
                catch (IOException iOException) {}
            }
        }
    }

    private static class RemoteMessageListenerServer
    implements Runnable {
        private final String host;
        private final int port;
        private final LinkedBlockingQueue<String> outbox;
        private Socket client;

        public RemoteMessageListenerServer(String host, int port, LinkedBlockingQueue<String> outbox) {
            this.host = host;
            this.port = port;
            this.outbox = outbox;
        }

        public void stop() {
            try {
                this.client.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            char EOL = (char)"\n".getBytes()[0];
            Socket client = null;
            try {
                char ch;
                client = new Socket(this.host, this.port);
                InputStream in = client.getInputStream();
                CharBuffer buffer = CharBuffer.allocate(5000);
                while ((ch = (char)in.read()) != '\uffffffff') {
                    while (ch != EOL) {
                        buffer.put(ch);
                        ch = (char)in.read();
                    }
                    buffer.flip();
                    String s = new String(buffer.array());
                    LinkedBlockingQueue<String> linkedBlockingQueue = this.outbox;
                    synchronized (linkedBlockingQueue) {
                        this.outbox.add(s + "\n");
                    }
                    buffer.position(0);
                    buffer.limit(5000);
                }
            }
            catch (Exception e) {
                if (LOGGER.isLoggable(Level.WARNING)) {
                    LOGGER.warning("Unable to start Remote Message listener" + client);
                }
            }
            finally {
                if (client != null && !client.isClosed()) {
                    try {
                        client.close();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

