/*
 * Decompiled with CFR 0.152.
 */
package org.apache.asterix.metadata.feeds;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.CharBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.feeds.api.IMessageReceiver;

public class SocketMessageListener {
    private static final Logger LOGGER = Logger.getLogger(SocketMessageListener.class.getName());
    private final int port;
    private final IMessageReceiver<String> messageReceiver;
    private final MessageListenerServer listenerServer;
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public SocketMessageListener(int port, IMessageReceiver<String> messageReceiver) {
        this.port = port;
        this.messageReceiver = messageReceiver;
        this.listenerServer = new MessageListenerServer(port, messageReceiver);
    }

    public void stop() throws IOException {
        this.listenerServer.stop();
        this.messageReceiver.close(false);
        if (!this.executorService.isShutdown()) {
            this.executorService.shutdownNow();
        }
    }

    public void start() {
        this.messageReceiver.start();
        this.executorService.execute(this.listenerServer);
    }

    private static class MessageListenerServer
    implements Runnable {
        private final int port;
        private final IMessageReceiver<String> messageReceiver;
        private ServerSocket server;
        private final Executor executor;

        public MessageListenerServer(int port, IMessageReceiver<String> messageReceiver) {
            this.port = port;
            this.messageReceiver = messageReceiver;
            this.executor = Executors.newCachedThreadPool();
        }

        public void stop() throws IOException {
            this.server.close();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Socket client = null;
            try {
                try {
                    this.server = new ServerSocket(this.port);
                    while (true) {
                        client = this.server.accept();
                        ClientHandler handler = new ClientHandler(client, this.messageReceiver);
                        this.executor.execute(handler);
                    }
                }
                catch (Exception e) {
                    if (LOGGER.isLoggable(Level.WARNING)) {
                        LOGGER.warning("Unable to start Message listener" + this.server);
                    }
                    if (this.server != null) {
                        try {
                            this.server.close();
                        }
                        catch (Exception e2) {
                            e2.printStackTrace();
                        }
                    }
                }
            }
            catch (Throwable throwable) {
                if (this.server != null) {
                    try {
                        this.server.close();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                throw throwable;
            }
        }

        private static class ClientHandler
        implements Runnable {
            private static final char EOL = (char)"\n".getBytes()[0];
            private final Socket client;
            private final IMessageReceiver<String> messageReceiver;

            public ClientHandler(Socket client, IMessageReceiver<String> messageReceiver) {
                this.client = client;
                this.messageReceiver = messageReceiver;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    char ch;
                    InputStream in = this.client.getInputStream();
                    CharBuffer buffer = CharBuffer.allocate(5000);
                    while ((ch = (char)in.read()) != '\uffffffff') {
                        while (ch != EOL) {
                            buffer.put(ch);
                            ch = (char)in.read();
                        }
                        buffer.flip();
                        String s = new String(buffer.array(), 0, buffer.limit());
                        this.messageReceiver.sendMessage((Object)(s + "\n"));
                        buffer.position(0);
                        buffer.limit(5000);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                    if (LOGGER.isLoggable(Level.WARNING)) {
                        LOGGER.warning("Unable to process mesages from client" + this.client);
                    }
                }
                finally {
                    if (this.client != null) {
                        try {
                            this.client.close();
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
}

