/*
 * Decompiled with CFR 0.152.
 */
package netflix.karyon;

import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.server.RxServer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

public class ShutdownListener {
    private static final Logger logger = LoggerFactory.getLogger(ShutdownListener.class);
    private final RxServer<String, String> shutdownCmdServer;

    public ShutdownListener(int shutdownPort, Func1<String, Observable<Void>> commandHandler) {
        this.shutdownCmdServer = RxNetty.createTcpServer((int)shutdownPort, (PipelineConfigurator)PipelineConfigurators.stringMessageConfigurator(), (ConnectionHandler)new ShutdownConnectionHandler(commandHandler));
    }

    public ShutdownListener(int shutdownPort, Action0 shutdownAction) {
        this(shutdownPort, new DefaultCommandHandler(shutdownAction));
    }

    public int getShutdownPort() {
        return this.shutdownCmdServer.getServerPort();
    }

    public void start() {
        this.shutdownCmdServer.start();
    }

    public void shutdown() throws InterruptedException {
        this.shutdownCmdServer.shutdown();
    }

    private class ShutdownConnectionHandler
    implements ConnectionHandler<String, String> {
        private final Func1<String, Observable<Void>> commandHandler;

        public ShutdownConnectionHandler(Func1<String, Observable<Void>> commandHandler) {
            this.commandHandler = commandHandler;
        }

        public Observable<Void> handle(ObservableConnection<String, String> conn) {
            return conn.getInput().take(1).doOnNext((Action1)new Action1<String>(){

                public void call(String s) {
                    logger.info("Received a command: " + s);
                }
            }).flatMap(this.commandHandler).doOnCompleted(new Action0(){

                public void call() {
                    try {
                        ShutdownListener.this.shutdown();
                    }
                    catch (InterruptedException e) {
                        logger.error("Interrupted while shutting down the shutdown command listener.");
                    }
                }
            });
        }
    }

    private static class DefaultCommandHandler
    implements Func1<String, Observable<Void>> {
        private final ExecutorService shutdownExec;
        private final Action0 shutdownAction;

        public DefaultCommandHandler(Action0 shutdownAction) {
            this.shutdownAction = shutdownAction;
            this.shutdownExec = Executors.newFixedThreadPool(1);
        }

        public Observable<Void> call(String cmd) {
            if ("shutdown".equalsIgnoreCase(cmd)) {
                return this.shutdownAsync();
            }
            return Observable.error((Throwable)new UnsupportedOperationException("Unknown command: " + cmd));
        }

        private Observable<Void> shutdownAsync() {
            final Future<Void> submitFuture = this.shutdownExec.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    DefaultCommandHandler.this.shutdownAction.call();
                    return null;
                }
            });
            return Observable.interval((long)10L, (TimeUnit)TimeUnit.SECONDS).take(1).map((Func1)new Func1<Long, Void>(){

                public Void call(Long aLong) {
                    block6: {
                        logger.info("Checking if shutdown is done..");
                        if (submitFuture.isDone()) {
                            try {
                                submitFuture.get();
                                logger.info("Shutdown is done..");
                            }
                            catch (InterruptedException e) {
                                logger.info("Shutdown returned error. ", (Throwable)e);
                            }
                            catch (ExecutionException e) {
                                if (e.getCause() instanceof IllegalStateException) {
                                    logger.info("Server already shutdown. ", (Throwable)e);
                                    break block6;
                                }
                                logger.info("Shutdown returned error. ", (Throwable)e);
                            }
                        } else {
                            logger.debug("Shutdown not yet done.");
                        }
                    }
                    return null;
                }
            });
        }
    }
}

