/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.ChannelOutputStream;
import org.apache.hadoop.hive.llap.LlapOutputFormat;
import org.apache.hadoop.hive.llap.LlapRecordWriter;
import org.apache.hadoop.mapred.RecordWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapOutputFormatService {
    private static final Logger LOG = LoggerFactory.getLogger(LlapOutputFormat.class);
    private static final AtomicBoolean started = new AtomicBoolean(false);
    private static final AtomicBoolean initing = new AtomicBoolean(false);
    private static LlapOutputFormatService INSTANCE;
    private final Map<String, RecordWriter> writers = new HashMap<String, RecordWriter>();
    private final Configuration conf;
    private static final int WAIT_TIME = 5;
    private static final int MAX_QUERY_ID_LENGTH = 256;
    private EventLoopGroup eventLoopGroup;
    private ServerBootstrap serverBootstrap;
    private ChannelFuture listeningChannelFuture;
    private int port;

    private LlapOutputFormatService(Configuration conf) throws IOException {
        this.conf = conf;
    }

    public static void initializeAndStart(Configuration conf) throws Exception {
        if (!initing.getAndSet(true)) {
            INSTANCE = new LlapOutputFormatService(conf);
            INSTANCE.start();
            started.set(true);
        }
    }

    public static LlapOutputFormatService get() throws IOException {
        Preconditions.checkState(started.get(), "LlapOutputFormatService must be started before invoking get");
        return INSTANCE;
    }

    public void start() throws IOException {
        LOG.info("Starting LlapOutputFormatService");
        int portFromConf = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
        this.eventLoopGroup = new NioEventLoopGroup(1);
        this.serverBootstrap = new ServerBootstrap();
        this.serverBootstrap.group(this.eventLoopGroup);
        this.serverBootstrap.channel(NioServerSocketChannel.class);
        this.serverBootstrap.childHandler((ChannelHandler)new LlapOutputFormatServiceChannelHandler());
        try {
            this.listeningChannelFuture = this.serverBootstrap.bind(portFromConf).sync();
            this.port = ((InetSocketAddress)this.listeningChannelFuture.channel().localAddress()).getPort();
            LOG.info("LlapOutputFormatService: Binding to port " + this.port);
        }
        catch (InterruptedException err) {
            throw new IOException("LlapOutputFormatService: Error binding to port " + portFromConf, err);
        }
    }

    public void stop() throws IOException, InterruptedException {
        LOG.info("Stopping LlapOutputFormatService");
        if (this.listeningChannelFuture != null) {
            this.listeningChannelFuture.channel().close().sync();
            this.listeningChannelFuture = null;
        } else {
            LOG.warn("LlapOutputFormatService does not appear to have a listening port to close.");
        }
        Future terminationFuture = this.eventLoopGroup.shutdownGracefully(1L, 5L, TimeUnit.SECONDS);
        terminationFuture.sync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <K, V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException {
        RecordWriter writer = null;
        LlapOutputFormatService llapOutputFormatService = INSTANCE;
        synchronized (llapOutputFormatService) {
            while ((writer = this.writers.get(id)) == null) {
                LOG.info("Waiting for writer for: " + id);
                INSTANCE.wait();
            }
        }
        LOG.info("Returning writer for: " + id);
        return writer;
    }

    public int getPort() {
        return this.port;
    }

    protected class LlapOutputFormatServiceChannelHandler
    extends ChannelInitializer<SocketChannel> {
        protected LlapOutputFormatServiceChannelHandler() {
        }

        public void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new ChannelHandler[]{new DelimiterBasedFrameDecoder(256, Delimiters.nulDelimiter()), new StringDecoder(), new StringEncoder(), new LlapOutputFormatServiceHandler()});
        }
    }

    protected class LlapOutputFormatChannelCloseListener
    implements ChannelFutureListener {
        private String id;

        LlapOutputFormatChannelCloseListener(String id) {
            this.id = id;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void operationComplete(ChannelFuture future) throws Exception {
            RecordWriter writer = null;
            LlapOutputFormatService llapOutputFormatService = INSTANCE;
            synchronized (llapOutputFormatService) {
                writer = (RecordWriter)LlapOutputFormatService.this.writers.remove(this.id);
            }
            if (writer == null) {
                LOG.warn("Did not find a writer for ID " + this.id);
            }
        }
    }

    protected class LlapOutputFormatServiceHandler
    extends SimpleChannelInboundHandler<String> {
        protected LlapOutputFormatServiceHandler() {
        }

        public void channelRead0(ChannelHandlerContext ctx, String msg) {
            String id = msg;
            this.registerReader(ctx, id);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void registerReader(ChannelHandlerContext ctx, String id) {
            LlapOutputFormatService llapOutputFormatService = INSTANCE;
            synchronized (llapOutputFormatService) {
                LOG.debug("registering socket for: " + id);
                int bufSize = 131072;
                ChannelOutputStream stream = new ChannelOutputStream(ctx, id, bufSize);
                LlapRecordWriter writer = new LlapRecordWriter(stream);
                LlapOutputFormatService.this.writers.put(id, writer);
                ctx.channel().closeFuture().addListener((GenericFutureListener)new LlapOutputFormatChannelCloseListener(id));
                INSTANCE.notifyAll();
            }
        }
    }
}

